Index: src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (working copy) @@ -40,7 +40,7 @@ for (int i = 0; i < 100; i++) { addDoc(writer); checkInvariants(writer); - if (writer.getRamSegmentCount() + writer.getSegmentCount() >= 18) { + if (writer.getSegmentCount() + writer.getSegmentCount() >= 18) { noOverMerge = true; } } @@ -178,7 +178,7 @@ int mergeFactor = writer.getMergeFactor(); int maxMergeDocs = writer.getMaxMergeDocs(); - int ramSegmentCount = writer.getRamSegmentCount(); + int ramSegmentCount = writer.getNumBufferedDocuments(); assertTrue(ramSegmentCount < maxBufferedDocs); int lowerBound = -1; Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -93,7 +93,7 @@ } modifier.flush(); - assertEquals(0, modifier.getRamSegmentCount()); + assertEquals(0, modifier.getNumBufferedDocuments()); assertTrue(0 < modifier.getSegmentCount()); if (!autoCommit) { @@ -435,7 +435,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexReader.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexReader.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy) @@ -803,7 +803,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -461,7 +461,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); @@ -842,6 +842,7 @@ public void testCommitOnCloseAbort() throws IOException { Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for (int i = 0; i < 14; i++) { addDoc(writer); } @@ -854,6 +855,7 @@ searcher.close(); writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDoc(writer); } @@ -878,6 +880,7 @@ // Now make sure we can re-open the index, add docs, // and all is good: writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int i=0;i<12;i++) { for(int j=0;j<17;j++) { addDoc(writer); @@ -945,6 +948,7 @@ public void testCommitOnCloseOptimize() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDocWithIndex(writer, j); } Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 523296) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -74,8 +74,6 @@ count++; } - modifier.close(); - } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); @@ -125,6 +123,9 @@ IndexerThread indexerThread = new IndexerThread(modifier); indexerThread.start(); + IndexerThread indexerThread2 = new IndexerThread(modifier); + indexerThread2.start(); + // Two searchers that constantly just re-instantiate the searcher: SearcherThread searcherThread1 = new SearcherThread(directory); searcherThread1.start(); @@ -133,9 +134,14 @@ searcherThread2.start(); indexerThread.join(); + indexerThread2.join(); searcherThread1.join(); searcherThread2.join(); + + modifier.close(); + assertTrue("hit unexpected exception in indexer", !indexerThread.failed); + assertTrue("hit unexpected exception in indexer 2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); //System.out.println(" Writer: " + indexerThread.count + " iterations"); Index: src/test/org/apache/lucene/index/TestIndexFileDeleter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexFileDeleter.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexFileDeleter.java (working copy) @@ -34,6 +34,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); int i; for(i=0;i<35;i++) { addDoc(writer, i); Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 523296) +++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy) @@ -254,6 +254,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -271,7 +272,7 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } // Simplistic check: just verify all segments_N's still @@ -316,6 +317,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -333,13 +335,15 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } - // Simplistic check: just verify the index is in fact - // readable: - IndexReader reader = IndexReader.open(dir); - reader.close(); + if (autoCommit) { + // Simplistic check: just verify the index is in fact + // readable: + IndexReader reader = IndexReader.open(dir); + reader.close(); + } dir.close(); } @@ -363,6 +367,7 @@ for(int j=0;j= dataLen) { + dataLen = input.read(ioBuffer); + bufferIndex = 0; + } + + if (dataLen == -1) { + if (length > 0) + break; + else + return null; + } else + c = ioBuffer[bufferIndex++]; + + if (c != ' ') { // if it's a token char + + if (length == 0) // start of token + start = offset - 1; + + buffer[length++] = c; + + if (length == MAX_WORD_LEN) // buffer overflow! + break; + + } else if (length > 0) // at non-Letter w/ chars + break; // return 'em + } + + // t.termText = new String(buffer, 0, length); + t.termBufferLength = length; + t.startOffset = start; + t.endOffset = start+length; + + return t; + } +} + Property changes on: src/java/org/apache/lucene/analysis/SimpleSpaceTokenizer.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/analysis/SimpleSpaceAnalyzer.java =================================================================== --- src/java/org/apache/lucene/analysis/SimpleSpaceAnalyzer.java (revision 0) +++ src/java/org/apache/lucene/analysis/SimpleSpaceAnalyzer.java (revision 0) @@ -0,0 +1,35 @@ +package org.apache.lucene.analysis; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Reader; + +/** An Analyzer that uses SimpleSpaceTokenizer. */ + +public final class SimpleSpaceAnalyzer extends Analyzer { + private ThreadLocal tokenizers = new ThreadLocal(); + public TokenStream tokenStream(String fieldName, Reader reader) { + SimpleSpaceTokenizer s = (SimpleSpaceTokenizer) tokenizers.get(); + if (s == null) { + s = new SimpleSpaceTokenizer(); + tokenizers.set(s); + } + s.init(reader); + return s; + } +} Property changes on: src/java/org/apache/lucene/analysis/SimpleSpaceAnalyzer.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/analysis/Token.java =================================================================== --- src/java/org/apache/lucene/analysis/Token.java (revision 523296) +++ src/java/org/apache/lucene/analysis/Token.java (working copy) @@ -56,7 +56,13 @@ String type = "word"; // lexical type Payload payload; - + + // For better indexing performance, set buffer & length + // instead of termText + char[] termBuffer; + int termBufferOffset; + int termBufferLength; + private int positionIncrement = 1; /** Constructs a Token with the given term text, and start & end offsets. @@ -67,6 +73,17 @@ endOffset = end; } + /** Constructs a Token with the given term text buffer + starting at offset for length lenth, and start & end offsets. + The type defaults to "word." */ + public Token(char[] text, int offset, int length, int start, int end) { + termBuffer = text; + termBufferOffset = offset; + termBufferLength = length; + startOffset = start; + endOffset = end; + } + /** Constructs a Token with the given text, start and end offsets, & type. */ public Token(String text, int start, int end, String typ) { termText = text; @@ -75,6 +92,19 @@ type = typ; } + /** Constructs a Token with the given term text buffer + starting at offset for length lenth, and start & end + offsets, & type. */ + public Token(char[] text, int offset, int length, int start, int end, String typ) { + termBuffer = text; + termBufferOffset = offset; + termBufferLength = length; + startOffset = start; + endOffset = end; + type = typ; + } + + /** Set the position increment. This determines the position of this token * relative to the previous Token in a {@link TokenStream}, used in phrase * searching. @@ -119,7 +149,17 @@ /** Returns the Token's term text. */ public final String termText() { return termText; } + public final char[] termBuffer() { return termBuffer; } + public final int termBufferOffset() { return termBufferOffset; } + public final int termBufferLength() { return termBufferLength; } + public final void setTermBuffer(char[] buffer, int offset, int length) { + this.termBuffer = buffer; + this.termBufferOffset = offset; + this.termBufferLength = length; + } + + /** Returns this Token's starting offset, the position of the first character corresponding to this token in the source text. Index: src/java/org/apache/lucene/index/FieldInfos.java =================================================================== --- src/java/org/apache/lucene/index/FieldInfos.java (revision 523296) +++ src/java/org/apache/lucene/index/FieldInfos.java (working copy) @@ -156,7 +156,7 @@ * @param omitNorms true if the norms for the indexed field should be omitted */ public void add(String name, boolean isIndexed, boolean storeTermVector, - boolean storePositionWithTermVector, boolean storeOffsetWithTermVector, boolean omitNorms) { + boolean storePositionWithTermVector, boolean storeOffsetWithTermVector, boolean omitNorms) { add(name, isIndexed, storeTermVector, storePositionWithTermVector, storeOffsetWithTermVector, omitNorms, false); } @@ -174,12 +174,13 @@ * @param omitNorms true if the norms for the indexed field should be omitted * @param storePayloads true if payloads should be stored for this field */ - public void add(String name, boolean isIndexed, boolean storeTermVector, - boolean storePositionWithTermVector, boolean storeOffsetWithTermVector, - boolean omitNorms, boolean storePayloads) { + // nocommit: API change, but, not yet released + public FieldInfo add(String name, boolean isIndexed, boolean storeTermVector, + boolean storePositionWithTermVector, boolean storeOffsetWithTermVector, + boolean omitNorms, boolean storePayloads) { FieldInfo fi = fieldInfo(name); if (fi == null) { - addInternal(name, isIndexed, storeTermVector, storePositionWithTermVector, storeOffsetWithTermVector, omitNorms, storePayloads); + return addInternal(name, isIndexed, storeTermVector, storePositionWithTermVector, storeOffsetWithTermVector, omitNorms, storePayloads); } else { if (fi.isIndexed != isIndexed) { fi.isIndexed = true; // once indexed, always index @@ -199,19 +200,20 @@ if (fi.storePayloads != storePayloads) { fi.storePayloads = true; } - } + return fi; } - - private void addInternal(String name, boolean isIndexed, - boolean storeTermVector, boolean storePositionWithTermVector, - boolean storeOffsetWithTermVector, boolean omitNorms, boolean storePayloads) { + // nocommit: API change + private FieldInfo addInternal(String name, boolean isIndexed, + boolean storeTermVector, boolean storePositionWithTermVector, + boolean storeOffsetWithTermVector, boolean omitNorms, boolean storePayloads) { FieldInfo fi = new FieldInfo(name, isIndexed, byNumber.size(), storeTermVector, storePositionWithTermVector, storeOffsetWithTermVector, omitNorms, storePayloads); byNumber.add(fi); byName.put(name, fi); + return fi; } public int fieldNumber(String fieldName) { Index: src/java/org/apache/lucene/index/IndexReader.java =================================================================== --- src/java/org/apache/lucene/index/IndexReader.java (revision 523296) +++ src/java/org/apache/lucene/index/IndexReader.java (working copy) @@ -771,7 +771,7 @@ // KeepOnlyLastCommitDeleter: IndexFileDeleter deleter = new IndexFileDeleter(directory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, - segmentInfos, null); + segmentInfos, null, null); // Checkpoint the state we are about to change, in // case we have to roll back: Index: src/java/org/apache/lucene/index/IndexFileNames.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileNames.java (revision 523296) +++ src/java/org/apache/lucene/index/IndexFileNames.java (working copy) @@ -50,6 +50,9 @@ /** Extension of separate norms */ static final String SEPARATE_NORMS_EXTENSION = "s"; + /** Extension of flushed temp segment data */ + static final String TEMP_FLUSH_EXTENSION = "tfp"; + /** * This array contains all filename extensions used by * Lucene's index files, with two exceptions, namely the @@ -60,7 +63,7 @@ */ static final String INDEX_EXTENSIONS[] = new String[] { "cfs", "fnm", "fdx", "fdt", "tii", "tis", "frq", "prx", "del", - "tvx", "tvd", "tvf", "gen", "nrm" + "tvx", "tvd", "tvf", "gen", "nrm", "tfp" }; /** File extensions that are added to a compound file Index: src/java/org/apache/lucene/index/SegmentTermDocs.java =================================================================== --- src/java/org/apache/lucene/index/SegmentTermDocs.java (revision 523296) +++ src/java/org/apache/lucene/index/SegmentTermDocs.java (working copy) @@ -51,6 +51,7 @@ } public void seek(Term term) throws IOException { + //System.out.println("std: seek term " + term); TermInfo ti = parent.tis.get(term); seek(ti, term); } @@ -74,6 +75,7 @@ void seek(TermInfo ti, Term term) throws IOException { count = 0; + //System.out.println("std: seek " + ti); payloadLengthAtLastSkip = 0; FieldInfo fi = parent.fieldInfos.fieldInfo(term.field); currentFieldStoresPayloads = (fi != null) ? fi.storePayloads : false; @@ -81,6 +83,7 @@ df = 0; } else { df = ti.docFreq; + //System.out.println(" df = " + df); doc = 0; skipDoc = 0; skipCount = 0; @@ -90,6 +93,7 @@ skipPointer = freqPointer + ti.skipOffset; freqStream.seek(freqPointer); haveSkipped = false; + //System.out.println(" freqP = " + freqPointer + "; proxP = " + proxPointer); } } @@ -106,6 +110,7 @@ } public boolean next() throws IOException { + //System.out.println("std: now do next"); while (true) { if (count == df) return false; @@ -118,6 +123,7 @@ freq = freqStream.readVInt(); // else read freq count++; + //System.out.println("std: read " + docCode + "; freq= " + freq); if (deletedDocs == null || !deletedDocs.get(doc)) break; @@ -131,18 +137,22 @@ throws IOException { final int length = docs.length; int i = 0; + //System.out.println("read bulk: df =" + df + "; length=" + length); while (i < length && count < df) { // manually inlined call to next() for speed final int docCode = freqStream.readVInt(); + //System.out.println(" read code: " + docCode); doc += docCode >>> 1; // shift off low bit if ((docCode & 1) != 0) // if low bit is set freq = 1; // freq is one else freq = freqStream.readVInt(); // else read freq count++; + // //System.out.println(" read freq " + freq); if (deletedDocs == null || !deletedDocs.get(doc)) { + //System.out.println(" add " + doc + "; freq=" + freq); docs[i] = doc; freqs[i] = freq; ++i; @@ -156,7 +166,9 @@ /** Optimized implementation. */ public boolean skipTo(int target) throws IOException { + //System.out.println("std skip to " + target); if (df >= skipInterval) { // optimized case + //System.out.println(" is frequent enough"); if (skipStream == null) skipStream = (IndexInput) freqStream.clone(); // lazily clone @@ -172,6 +184,7 @@ long lastFreqPointer = freqStream.getFilePointer(); long lastProxPointer = -1; int numSkipped = -1 - (count % skipInterval); + //System.out.println(" target " + target + "; skipDoc " + skipDoc); while (target > skipDoc) { lastSkipDoc = skipDoc; @@ -203,11 +216,13 @@ freqPointer += skipStream.readVInt(); proxPointer += skipStream.readVInt(); + //System.out.println(" now freq " + freqPointer + " prox " + proxPointer); skipCount++; } // if we found something to skip, then skip it if (lastFreqPointer > freqStream.getFilePointer()) { + //System.out.println(" do skip! " + lastFreqPointer); freqStream.seek(lastFreqPointer); skipProx(lastProxPointer, lastPayloadLength); @@ -219,6 +234,7 @@ // done skipping, now just scan do { + //System.out.println(" now scan " + target + " " + doc); if (!next()) return false; } while (target > doc); Index: src/java/org/apache/lucene/index/FieldsWriter.java =================================================================== --- src/java/org/apache/lucene/index/FieldsWriter.java (revision 523296) +++ src/java/org/apache/lucene/index/FieldsWriter.java (working copy) @@ -38,17 +38,90 @@ private IndexOutput indexStream; + private boolean doClose; + FieldsWriter(Directory d, String segment, FieldInfos fn) throws IOException { fieldInfos = fn; fieldsStream = d.createOutput(segment + ".fdt"); indexStream = d.createOutput(segment + ".fdx"); + doClose = true; } + FieldsWriter(IndexOutput fdx, IndexOutput fdt, FieldInfos fn) throws IOException { + fieldInfos = fn; + fieldsStream = fdt; + indexStream = fdx; + doClose = false; + } + IndexOutput getIndexStream() { + return indexStream; + } + IndexOutput getFieldsStream() { + return fieldsStream; + } + final void close() throws IOException { + if (doClose) { fieldsStream.close(); indexStream.close(); + } } + final void writeField(FieldInfo fi, Fieldable field) throws IOException { + // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode + // and field.binaryValue() already returns the compressed value for a field + // with isCompressed()==true, so we disable compression in that case + boolean disableCompression = (field instanceof FieldsReader.FieldForMerge); + fieldsStream.writeVInt(fi.number); + // System.out.println(" write field number " + fieldInfos.fieldNumber(field.name()) + " name " + field.name() + " to " + fieldsStream + " at " + fieldsStream.getFilePointer()); + byte bits = 0; + if (field.isTokenized()) + bits |= FieldsWriter.FIELD_IS_TOKENIZED; + if (field.isBinary()) + bits |= FieldsWriter.FIELD_IS_BINARY; + if (field.isCompressed()) + bits |= FieldsWriter.FIELD_IS_COMPRESSED; + + fieldsStream.writeByte(bits); + + if (field.isCompressed()) { + // compression is enabled for the current field + byte[] data = null; + + if (disableCompression) { + // optimized case for merging, the data + // is already compressed + data = field.binaryValue(); + } else { + // check if it is a binary field + if (field.isBinary()) { + data = compress(field.binaryValue()); + } + else { + data = compress(field.stringValue().getBytes("UTF-8")); + } + } + final int len = data.length; + // System.out.println(" compressed: " + len); + fieldsStream.writeVInt(len); + fieldsStream.writeBytes(data, len); + } + else { + // compression is disabled for the current field + if (field.isBinary()) { + byte[] data = field.binaryValue(); + final int len = data.length; + // System.out.println(" not compressed: " + len); + fieldsStream.writeVInt(len); + fieldsStream.writeBytes(data, len); + } + else { + fieldsStream.writeString(field.stringValue()); + } + } + // System.out.println(" fieldsStream now at " + fieldsStream.getFilePointer()); + } + final void addDocument(Document doc) throws IOException { indexStream.writeLong(fieldsStream.getFilePointer()); @@ -59,62 +132,14 @@ if (field.isStored()) storedCount++; } + // System.out.println("write " + storedCount + " fields to " + fieldsStream + " at " + fieldsStream.getFilePointer()); fieldsStream.writeVInt(storedCount); fieldIterator = doc.getFields().iterator(); while (fieldIterator.hasNext()) { Fieldable field = (Fieldable) fieldIterator.next(); - // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode - // and field.binaryValue() already returns the compressed value for a field - // with isCompressed()==true, so we disable compression in that case - boolean disableCompression = (field instanceof FieldsReader.FieldForMerge); - if (field.isStored()) { - fieldsStream.writeVInt(fieldInfos.fieldNumber(field.name())); - - byte bits = 0; - if (field.isTokenized()) - bits |= FieldsWriter.FIELD_IS_TOKENIZED; - if (field.isBinary()) - bits |= FieldsWriter.FIELD_IS_BINARY; - if (field.isCompressed()) - bits |= FieldsWriter.FIELD_IS_COMPRESSED; - - fieldsStream.writeByte(bits); - - if (field.isCompressed()) { - // compression is enabled for the current field - byte[] data = null; - - if (disableCompression) { - // optimized case for merging, the data - // is already compressed - data = field.binaryValue(); - } else { - // check if it is a binary field - if (field.isBinary()) { - data = compress(field.binaryValue()); - } - else { - data = compress(field.stringValue().getBytes("UTF-8")); - } - } - final int len = data.length; - fieldsStream.writeVInt(len); - fieldsStream.writeBytes(data, len); - } - else { - // compression is disabled for the current field - if (field.isBinary()) { - byte[] data = field.binaryValue(); - final int len = data.length; - fieldsStream.writeVInt(len); - fieldsStream.writeBytes(data, len); - } - else { - fieldsStream.writeString(field.stringValue()); - } - } - } + if (field.isStored()) + writeField(fieldInfos.fieldInfo(field.name()), field); } } Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 523296) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -60,10 +60,11 @@ (which just deletes and then adds). When finished adding, deleting and updating documents, close should be called.

These changes are buffered in memory and periodically - flushed to the {@link Directory} (during the above method calls). A flush is triggered when there are - enough buffered deletes (see {@link - #setMaxBufferedDeleteTerms}) or enough added documents - (see {@link #setMaxBufferedDocs}) since the last flush, + flushed to the {@link Directory} (during the above method + calls). A flush is triggered when there are enough + buffered deletes (see {@link #setMaxBufferedDeleteTerms}) + or enough added documents (see {@link #setMaxBufferedDocs} + and {@link #setRAMBufferSizeMB}) since the last flush, whichever is sooner. When a flush occurs, both pending deletes and added documents are flushed to the index. A flush may also trigger one or more segment merges.

@@ -171,11 +172,17 @@ public final static int DEFAULT_MERGE_FACTOR = 10; /** - * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}. + * Default value is 0 (meaning flush is based on RAM usage + * by default). Change using {@link #setMaxBufferedDocs}. */ - public final static int DEFAULT_MAX_BUFFERED_DOCS = 10; + public final static int DEFAULT_MAX_BUFFERED_DOCS = 0; /** + * Default value is 16 MB. Change using {@link #setRAMBufferSizeMB}. + */ + public final static float DEFAULT_RAM_BUFFER_SIZE_MB = 16F; + + /** * Default value is 1000. Change using {@link #setMaxBufferedDeleteTerms(int)}. */ public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; @@ -208,8 +215,7 @@ private boolean autoCommit = true; // false if we should commit only on close SegmentInfos segmentInfos = new SegmentInfos(); // the segments - SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory - private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs + private MultiDocumentWriter docWriter; private IndexFileDeleter deleter; private Lock writeLock; @@ -563,6 +569,10 @@ } } + IndexFileDeleter getDeleter() { + return deleter; + } + private void init(Directory d, Analyzer a, final boolean create, boolean closeDir, IndexDeletionPolicy deletionPolicy, boolean autoCommit) throws CorruptIndexException, LockObtainFailedException, IOException { this.closeDir = closeDir; @@ -602,11 +612,14 @@ rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); } + docWriter = new MultiDocumentWriter(newSegmentName(), directory, this, !autoCommit); + docWriter.setInfoStream(infoStream); + // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: deleter = new IndexFileDeleter(directory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, - segmentInfos, infoStream); + segmentInfos, infoStream, docWriter); } catch (IOException e) { this.writeLock.release(); @@ -661,19 +674,26 @@ } /** Determines the minimal number of documents required before the buffered - * in-memory documents are merged and a new Segment is created. + * in-memory documents are flushed as a new Segment. * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory}, * large value gives faster indexing. At the same time, mergeFactor limits * the number of files open in a FSDirectory. * - *

The default value is 10. + *

If this is 0, then the RAM buffer is flushed instead + * by overally RAM usage (see {@link + * #setRAMBufferSizeMB}). If this is non-zero, then + * flushing is triggered by maxBufferedDocs and not by + * overall RAM usage.

* - * @throws IllegalArgumentException if maxBufferedDocs is smaller than 2 + *

The default value is 0.

+ * + * @throws IllegalArgumentException if maxBufferedDocs is + * non-zero and smaller than 2 */ public void setMaxBufferedDocs(int maxBufferedDocs) { ensureOpen(); - if (maxBufferedDocs < 2) - throw new IllegalArgumentException("maxBufferedDocs must at least be 2"); + if (maxBufferedDocs != 0 && maxBufferedDocs < 2) + throw new IllegalArgumentException("maxBufferedDocs must at least be 2 or 0 to disable"); this.minMergeDocs = maxBufferedDocs; } @@ -685,7 +705,28 @@ return minMergeDocs; } + /** Determines the amount of RAM that may be used for + * buffering before the in-memory documents are flushed as + * a new Segment. This only applies when maxBufferedDocs + * is set to 0. Generally for faster indexing performance + * it's best to flush by RAM usage instead of document + * count. + */ + public void setRAMBufferSizeMB(float mb) { + if (mb < 1) + throw new IllegalArgumentException("ramBufferSize must at least be 1 MB"); + ramBufferSize = mb*1024F*1024F; + docWriter.setRAMBufferSizeMB(mb); + } + /** + * @see #setRAMBufferSizeMB + */ + public float getRAMBufferSizeMB() { + return ramBufferSize/1024F/1024F; + } + + /** *

Determines the minimal number of delete terms required before the buffered * in-memory delete terms are applied and flushed. If there are documents * buffered in memory at the time, they are merged and a new segment is @@ -756,7 +797,9 @@ public void setInfoStream(PrintStream infoStream) { ensureOpen(); this.infoStream = infoStream; - deleter.setInfoStream(infoStream); + docWriter.setInfoStream(infoStream); + // nocommit + //deleter.setInfoStream(infoStream); } /** @@ -835,7 +878,7 @@ */ public synchronized void close() throws CorruptIndexException, IOException { if (!closed) { - flushRamSegments(); + flush(); if (commitPending) { segmentInfos.write(directory); // now commit changes @@ -844,7 +887,6 @@ rollbackSegmentInfos = null; } - ramDirectory.close(); if (writeLock != null) { writeLock.release(); // release write lock writeLock = null; @@ -884,7 +926,7 @@ /** Returns the number of documents currently in this index. */ public synchronized int docCount() { ensureOpen(); - int count = ramSegmentInfos.size(); + int count = docWriter.docID; for (int i = 0; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); count += si.docCount; @@ -962,24 +1004,13 @@ */ public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); - synchronized (this) { - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); - } + docWriter.addDocument(doc, analyzer); + // For the non-autoCommit case, MultiDocumentWriter + // takes care of flushing its pending state to disk + if (autoCommit) + maybeFlush(); } - SegmentInfo buildSingleDocSegment(Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - DocumentWriter dw = new DocumentWriter(ramDirectory, analyzer, this); - dw.setInfoStream(infoStream); - String segmentName = newRamSegmentName(); - dw.addDocument(segmentName, doc); - SegmentInfo si = new SegmentInfo(segmentName, 1, ramDirectory, false, false); - si.setNumFields(dw.getNumFields()); - return si; - } - /** * Deletes the document(s) containing term. * @param term the term to identify the documents to be deleted @@ -989,7 +1020,7 @@ public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); bufferDeleteTerm(term); - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1005,7 +1036,7 @@ for (int i = 0; i < terms.length; i++) { bufferDeleteTerm(terms[i]); } - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1041,26 +1072,23 @@ public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); - synchronized (this) { - bufferDeleteTerm(term); - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); - } + bufferDeleteTerm(term); + docWriter.addDocument(doc, analyzer); + // nocommit: what if we need to trigger on max delete terms? + // For the non-autoCommit case, MultiDocumentWriter + // takes care of flushing its pending state to disk + if (autoCommit) + maybeFlush(); } - final synchronized String newRamSegmentName() { - return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX); - } - // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); } // for test purpose - final synchronized int getRamSegmentCount(){ - return ramSegmentInfos.size(); + final synchronized int getNumBufferedDocuments(){ + return docWriter.docID; } // for test purpose @@ -1089,6 +1117,7 @@ */ private int mergeFactor = DEFAULT_MERGE_FACTOR; + // nocommit fix javadocs /** Determines the minimal number of documents required before the buffered * in-memory documents are merging and a new Segment is created. * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory}, @@ -1096,10 +1125,11 @@ * the number of files open in a FSDirectory. * *

The default value is {@link #DEFAULT_MAX_BUFFERED_DOCS}. - */ private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS; + // nocommit javadoc + private float ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; /** Determines the largest number of documents ever merged by addDocument(). * Small values (e.g., less than 10,000) are best for interactive indexing, @@ -1177,7 +1207,7 @@ */ public synchronized void optimize() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + flush(); while (segmentInfos.size() > 1 || (segmentInfos.size() == 1 && (SegmentReader.hasDeletions(segmentInfos.info(0)) || @@ -1186,7 +1216,7 @@ (useCompoundFile && (!SegmentReader.usesCompoundFile(segmentInfos.info(0))))))) { int minSegment = segmentInfos.size() - mergeFactor; - mergeSegments(segmentInfos, minSegment < 0 ? 0 : minSegment, segmentInfos.size()); + mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size()); } } @@ -1203,7 +1233,7 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; if (localAutoCommit) { - flushRamSegments(); + flush(); // Turn off auto-commit during our local transaction: autoCommit = false; } else @@ -1293,16 +1323,18 @@ segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); + docWriter.abort(); + // Ask deleter to locate unreferenced files & remove // them: deleter.checkpoint(segmentInfos, false); deleter.refresh(); - ramSegmentInfos = new SegmentInfos(); bufferedDeleteTerms.clear(); numBufferedDeleteTerms = 0; commitPending = false; + docWriter.abort(); close(); } else { @@ -1397,7 +1429,7 @@ for (int base = start; base < segmentInfos.size(); base++) { int end = Math.min(segmentInfos.size(), base+mergeFactor); if (end-base > 1) { - mergeSegments(segmentInfos, base, end); + mergeSegments(base, end); } } } @@ -1437,7 +1469,7 @@ // segments in S may not since they could come from multiple indexes. // Here is the merge algorithm for addIndexesNoOptimize(): // - // 1 Flush ram segments. + // 1 Flush ram. // 2 Consider a combined sequence with segments from T followed // by segments from S (same as current addIndexes(Directory[])). // 3 Assume the highest level for segments in S is h. Call @@ -1458,14 +1490,18 @@ // copy a segment, which may cause doc count to change because deleted // docs are garbage collected. - // 1 flush ram segments + // 1 flush ram ensureOpen(); - flushRamSegments(); + flush(); // 2 copy segment infos and find the highest level from dirs int startUpperBound = minMergeDocs; + // nocommit: what to do? + if (startUpperBound == 0) + startUpperBound = 10; + boolean success = false; startTransaction(); @@ -1524,7 +1560,7 @@ // copy those segments from S for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { - mergeSegments(segmentInfos, i, i + 1); + mergeSegments(i, i + 1); } if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { success = true; @@ -1533,7 +1569,7 @@ } // invariants do not hold, simply merge those segments - mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount); + mergeSegments(segmentCount - numTailSegments, segmentCount); // maybe merge segments again if necessary if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { @@ -1673,22 +1709,18 @@ throws IOException { } - protected final void maybeFlushRamSegments() throws CorruptIndexException, IOException { + protected final synchronized void maybeFlush() throws CorruptIndexException, IOException { // A flush is triggered if enough new documents are buffered or - // if enough delete terms are buffered - if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) { - flushRamSegments(); + // if enough delete terms are buffered or enough RAM is + // being consumed + // nocommit + if (numBufferedDeleteTerms >= maxBufferedDeleteTerms || + (autoCommit && ((minMergeDocs != 0 && docWriter.docID >= minMergeDocs) || + ((true || minMergeDocs == 0) && autoCommit && docWriter.getRAMUsed() > ramBufferSize)))) { + flush(); } } - /** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */ - private final synchronized void flushRamSegments() throws CorruptIndexException, IOException { - if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) { - mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size()); - maybeMergeSegments(minMergeDocs); - } - } - /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -1699,7 +1731,86 @@ */ public final synchronized void flush() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + + SegmentInfo newSegment = null; + boolean anything = false; + + boolean flushDocs = docWriter.docID > 0; + boolean flushDeletes = bufferedDeleteTerms.size() > 0; + final int numDocs = docWriter.docID; + + if (flushDocs || flushDeletes) { + + SegmentInfos rollback = null; + + if (flushDeletes) + rollback = (SegmentInfos) segmentInfos.clone(); + + boolean success = false; + + try { + if (flushDocs) { + int mergedDocCount = docWriter.docID; + String segment = docWriter.segment; + docWriter.flush(newSegmentName()); + newSegment = new SegmentInfo(segment, + mergedDocCount, + directory, false, true); + segmentInfos.addElement(newSegment); + } + + if (flushDeletes) { + maybeApplyDeletes(flushDocs); + doAfterFlush(); + } + + checkpoint(); + success = true; + } finally { + if (!success) { + if (flushDeletes) { + // Fully replace the segmentInfos since flushed + // deletes could have changed any of the + // SegmentInfo instances: + segmentInfos.clear(); + segmentInfos.addAll(rollback); + } else { + // Remove segment we added, if any: + if (newSegment != null && + segmentInfos.size() > 0 && + segmentInfos.info(segmentInfos.size()-1) == newSegment) + segmentInfos.remove(segmentInfos.size()-1); + docWriter.abort(); + } + deleter.checkpoint(segmentInfos, false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + + if (flushDocs && useCompoundFile) { + success = false; + try { + docWriter.createCompoundFile(newSegment.name); + newSegment.setUseCompoundFile(true); + checkpoint(); + success = true; + } finally { + if (!success) { + newSegment.setUseCompoundFile(false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + } + + // nocommit + // maybeMergeSegments(mergeFactor * numDocs / 2); + + maybeMergeSegments(minMergeDocs); + } } /** Expert: Return the total size of all index files currently cached in memory. @@ -1707,15 +1818,15 @@ */ public final long ramSizeInBytes() { ensureOpen(); - return ramDirectory.sizeInBytes(); + return docWriter.getRAMUsed(); } /** Expert: Return the number of documents whose segments are currently cached in memory. - * Useful when calling flushRamSegments() + * Useful when calling flush() */ public final synchronized int numRamDocs() { ensureOpen(); - return ramSegmentInfos.size(); + return docWriter.docID; } /** Incremental segment merger. */ @@ -1723,6 +1834,9 @@ long lowerBound = -1; long upperBound = startUpperBound; + // nocommit + if (upperBound == 0) upperBound = 10; + while (upperBound < maxMergeDocs) { int minSegment = segmentInfos.size(); int maxSegment = -1; @@ -1754,7 +1868,7 @@ while (numSegments >= mergeFactor) { // merge the leftmost* mergeFactor segments - int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor); + int docCount = mergeSegments(minSegment, minSegment + mergeFactor); numSegments -= mergeFactor; if (docCount > upperBound) { @@ -1783,39 +1897,29 @@ * Merges the named range of segments, replacing them in the stack with a * single segment. */ - private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end) + private final int mergeSegments(int minSegment, int end) throws CorruptIndexException, IOException { - // We may be called solely because there are deletes - // pending, in which case doMerge is false: - boolean doMerge = end > 0; final String mergedName = newSegmentName(); + SegmentMerger merger = null; - - final List ramSegmentsToDelete = new ArrayList(); - SegmentInfo newSegment = null; int mergedDocCount = 0; - boolean anyDeletes = (bufferedDeleteTerms.size() != 0); // This is try/finally to make sure merger's readers are closed: try { - if (doMerge) { - if (infoStream != null) infoStream.print("merging segments"); - merger = new SegmentMerger(this, mergedName); + if (infoStream != null) infoStream.print("merging segments"); - for (int i = minSegment; i < end; i++) { - SegmentInfo si = sourceSegments.info(i); - if (infoStream != null) - infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); - IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) - merger.add(reader); - if (reader.directory() == this.ramDirectory) { - ramSegmentsToDelete.add(si); - } - } + merger = new SegmentMerger(this, mergedName); + + for (int i = minSegment; i < end; i++) { + SegmentInfo si = segmentInfos.info(i); + if (infoStream != null) + infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); + IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) + merger.add(reader); } SegmentInfos rollback = null; @@ -1825,99 +1929,57 @@ // if we hit exception when doing the merge: try { - if (doMerge) { - mergedDocCount = merger.merge(); + mergedDocCount = merger.merge(); - if (infoStream != null) { - infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); - } + if (infoStream != null) { + infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); + } - newSegment = new SegmentInfo(mergedName, mergedDocCount, - directory, false, true); - } + newSegment = new SegmentInfo(mergedName, mergedDocCount, + directory, false, true); - if (sourceSegments != ramSegmentInfos || anyDeletes) { - // Now save the SegmentInfo instances that - // we are replacing: - rollback = (SegmentInfos) segmentInfos.clone(); - } + rollback = (SegmentInfos) segmentInfos.clone(); - if (doMerge) { - if (sourceSegments == ramSegmentInfos) { - segmentInfos.addElement(newSegment); - } else { - for (int i = end-1; i > minSegment; i--) // remove old infos & add new - sourceSegments.remove(i); + for (int i = end-1; i > minSegment; i--) // remove old infos & add new + segmentInfos.remove(i); - segmentInfos.set(minSegment, newSegment); - } - } + segmentInfos.set(minSegment, newSegment); - if (sourceSegments == ramSegmentInfos) { - maybeApplyDeletes(doMerge); - doAfterFlush(); - } - checkpoint(); success = true; } finally { - if (success) { - // The non-ram-segments case is already committed - // (above), so all the remains for ram segments case - // is to clear the ram segments: - if (sourceSegments == ramSegmentInfos) { - ramSegmentInfos.removeAllElements(); - } - } else { + if (!success && rollback != null) { + // Rollback the individual SegmentInfo + // instances, but keep original SegmentInfos + // instance (so we don't try to write again the + // same segments_N file -- write once): + segmentInfos.clear(); + segmentInfos.addAll(rollback); - // Must rollback so our state matches index: - if (sourceSegments == ramSegmentInfos && !anyDeletes) { - // Simple case: newSegment may or may not have - // been added to the end of our segment infos, - // so just check & remove if so: - if (newSegment != null && - segmentInfos.size() > 0 && - segmentInfos.info(segmentInfos.size()-1) == newSegment) { - segmentInfos.remove(segmentInfos.size()-1); - } - } else if (rollback != null) { - // Rollback the individual SegmentInfo - // instances, but keep original SegmentInfos - // instance (so we don't try to write again the - // same segments_N file -- write once): - segmentInfos.clear(); - segmentInfos.addAll(rollback); - } - // Delete any partially created and now unreferenced files: deleter.refresh(); } } } finally { // close readers before we attempt to delete now-obsolete segments - if (doMerge) merger.closeReaders(); + merger.closeReaders(); } - // Delete the RAM segments - deleter.deleteDirect(ramDirectory, ramSegmentsToDelete); - // Give deleter a chance to remove files now. deleter.checkpoint(segmentInfos, autoCommit); - if (useCompoundFile && doMerge) { + if (useCompoundFile) { boolean success = false; try { - merger.createCompoundFile(mergedName + ".cfs"); newSegment.setUseCompoundFile(true); checkpoint(); success = true; - } finally { if (!success) { // Must rollback: @@ -1936,14 +1998,14 @@ // Called during flush to apply any buffered deletes. If // doMerge is true then a new segment was just created and // flushed from the ram segments. - private final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException { + private final void maybeApplyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on " + segmentInfos.size() + " segments."); - if (doMerge) { + if (flushedNewSegment) { IndexReader reader = null; try { reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1)); @@ -1964,7 +2026,7 @@ } int infosEnd = segmentInfos.size(); - if (doMerge) { + if (flushedNewSegment) { infosEnd--; } @@ -1996,6 +2058,8 @@ private final boolean checkNonDecreasingLevels(int start) { int lowerBound = -1; int upperBound = minMergeDocs; + if (upperBound == 0) + upperBound = 10; for (int i = segmentInfos.size() - 1; i >= start; i--) { int docCount = segmentInfos.info(i).docCount; @@ -2044,10 +2108,11 @@ // well as the disk segments. private void bufferDeleteTerm(Term term) { Num num = (Num) bufferedDeleteTerms.get(term); + int numDoc = docWriter.docID; if (num == null) { - bufferedDeleteTerms.put(term, new Num(ramSegmentInfos.size())); + bufferedDeleteTerms.put(term, new Num(numDoc)); } else { - num.setNum(ramSegmentInfos.size()); + num.setNum(numDoc); } numBufferedDeleteTerms++; } @@ -2057,17 +2122,20 @@ // the documents buffered before it, not those buffered after it. private final void applyDeletesSelectively(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { + //System.out.println("now apply selective deletes"); Iterator iter = deleteTerms.entrySet().iterator(); while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); - + //System.out.println(" term " + term); + TermDocs docs = reader.termDocs(term); if (docs != null) { int num = ((Num) entry.getValue()).getNum(); try { while (docs.next()) { int doc = docs.doc(); + //System.out.println(" doc " + doc + " vs " + num); if (doc >= num) { break; } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 523296) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -97,6 +97,7 @@ private PrintStream infoStream; private Directory directory; private IndexDeletionPolicy policy; + private MultiDocumentWriter docWriter; void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; @@ -116,10 +117,12 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream) + public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream, MultiDocumentWriter docWriter) throws CorruptIndexException, IOException { + this.docWriter = docWriter; this.infoStream = infoStream; + this.policy = policy; this.directory = directory; @@ -310,6 +313,8 @@ // Incref the files: incRef(segmentInfos, isCommit); + if (docWriter != null) + incRef(docWriter.files()); if (isCommit) { // Append to our commits list: @@ -325,9 +330,8 @@ // DecRef old files from the last checkpoint, if any: int size = lastFiles.size(); if (size > 0) { - for(int i=0;i freeThreadStates.size()) { + // System.out.println(" mark pending & wait..." + numThreadState + " vs " + freeThreadStates.size()); + flushPending = true; + while (numThreadState > freeThreadStates.size()) { + // System.out.println("flush wait: " + numThreadState + " vs " + freeThreadStates.size()); + try { + wait(); + } catch (InterruptedException e) { + } + } + // System.out.println(" wait done..."); + } + + // nocommit: what if we hit exception before notifyAll? + netFlushCount += docID; + netFlushTimes++; + // System.out.println(" FLUSH avg # docs=" + (((float) netFlushCount)/netFlushTimes)); + + fieldInfos.write(directory, segment + ".fnm"); + + // nocommit: not clean!! + ThreadState state = (ThreadState) freeThreadStates.get(0); + + state.flushTerms(); + + // write norms of indexed fields + writeNorms(); + + assert fieldInfos.hasVectors() == (tvx != null); + + if (tvx != null) { + flushedVectors = true; + tvx.close(); + tvf.close(); + tvd.close(); + tvx = null; + } else { + flushedVectors = false; + } + + if (fieldsWriter != null) { + fieldsWriter.close(); + fieldsWriter = null; + } + + final int size = freeThreadStates.size(); + for(int i=0;i0) position += analyzer.getPositionIncrementGap(fieldInfo.name); + + storeOffsets = field.isStoreOffsetWithTermVector(); + + netDocCount++; + + if (!field.isTokenized()) { // un-tokenized field + token = localToken; + String stringValue = field.stringValue(); + token.setTermText(stringValue); + if (storeOffsets) { + offsetStart = offset; + offsetEnd = offset + stringValue.length(); + addPosition(); + } else { + offsetStart = offsetEnd = -1; + addPosition(); + } + offset += stringValue.length(); + length++; + // nocommit + netDocSize += stringValue.length(); + } else { + Reader reader; // find or make Reader + if (field.readerValue() != null) + reader = field.readerValue(); + else { + stringReader.init(field.stringValue()); + // nocommit + netDocSize += field.stringValue().length(); + reader = stringReader; + } + + // Tokenize field and add to postingTable + TokenStream stream = analyzer.tokenStream(fieldInfo.name, reader); + try { + offsetEnd = offset-1; + if (storeOffsets) { + for (token = stream.next(); token != null; token = stream.next()) { + position += (token.getPositionIncrement() - 1); + offsetStart = offset + token.startOffset(); + offsetEnd = offset + token.endOffset(); + addPosition(); + if (++length >= maxFieldLength) { + if (infoStream != null) + infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); + break; + } + } + offset = offsetEnd+1; + } else { + offsetStart = offsetEnd = -1; + for (token = stream.next(); token != null; token = stream.next()) { + position += (token.getPositionIncrement() - 1); + addPosition(); + if (++length >= maxFieldLength) { + if (infoStream != null) + infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); + break; + } + } + } + + } finally { + stream.close(); + } + } + + boost *= field.getBoost(); + } + + private final class ReusableStringReader extends Reader { + int upto; + int left; + String s; + void init(String s) { + this.s = s; + left = s.length(); + this.upto = 0; + } + public int read(char[] c) { + return read(c, 0, c.length); + } + public int read(char[] c, int off, int len) { + if (left > len) { + s.getChars(upto, upto+len, c, off); + upto += len; + left -= len; + return len; + } else if (0 == left) { + return -1; + } else { + s.getChars(upto, upto+left, c, off); + int r = left; + left = 0; + upto = s.length(); + return r; + } + } + public void close() {}; + } + + ReusableStringReader stringReader = new ReusableStringReader(); + + private final class CharBlock { + char[] buffer; + CharBlock next; + } + + CharBlock charBlockStart; + CharBlock currentCharBlock; + char[] currentCharBuffer; + int currentCharUpto; + int currentCharLimit; + + void initCharBlocks() { + currentCharBlock = charBlockStart = new CharBlock(); + // TODO: tune + currentCharLimit = 65536; + currentCharBuffer = currentCharBlock.buffer = new char[currentCharLimit]; + currentCharUpto = 0; + } + + void newCharBlock() { + if (currentCharBlock.next == null) { + CharBlock newBlock = new CharBlock(); + currentCharBlock.next = newBlock; + newBlock.next = null; + newBlock.buffer = new char[65536]; + currentCharBlock = newBlock; + } else + currentCharBlock = currentCharBlock.next; + currentCharBuffer = currentCharBlock.buffer; + currentCharUpto = 0; + currentCharLimit = currentCharBuffer.length; + } + + void resetCharBlocks() { + currentCharBlock = charBlockStart; + currentCharBuffer = currentCharBlock.buffer; + currentCharUpto = 0; + currentCharLimit = currentCharBuffer.length; + } + + private final class IntBlock { + int[] buffer; + IntBlock next; + int offset; + } + + IntBlock intBlockStart; + IntBlock currentIntBlock; + int[] currentIntBuffer; + int currentIntUpto; + int currentIntOffset; + int currentIntLimit; + + void initIntBlocks() { + currentIntBlock = intBlockStart = new IntBlock(); + // System.out.println("ALLOC start int block"); + currentIntLimit = 65536; + currentIntBuffer = currentIntBlock.buffer = new int[currentIntLimit]; + currentIntUpto = 0; + } + + int newIntSlice(Posting p, final int level, final int mod) { + final int newSize; + final int newLevel; + + // TODO: tune + switch(level) { + case 0: + newSize = 2+mod*2; + newLevel = 1; + break; + case 1: + newSize = 2+mod*4; + newLevel = 2; + break; + case 2: + newSize = 2+mod*8; + newLevel = 3; + break; + case 3: + newSize = 2+mod*16; + newLevel = 4; + break; + case 4: + newSize = 2+mod*32; + newLevel = 5; + break; + case 5: + newSize = 2+mod*64; + newLevel = 6; + break; + case 6: + newSize = 2+mod*128; + newLevel = 7; + break; + default: + newSize = 2+mod*256; + newLevel = 8; + break; + } + + if (currentIntUpto + newSize > currentIntLimit) + nextIntBlock(); + + // Put forwarding address at end of last chunk + p.buffer[p.limit] = currentIntOffset+currentIntUpto; + //System.out.println(" new int slice: upto=" + p.buffer[p.limit] + " saved @ upto=" + p.limit); + p.buffer = currentIntBuffer; + p.limit = currentIntUpto+newSize-1; + final int lastUpto = currentIntUpto; + p.buffer[p.limit] = newLevel; + currentIntUpto += newSize; + return lastUpto; + } + + void nextIntBlock() { + if (currentIntBlock.next == null) { + //System.out.println("ALLOC new int block"); + IntBlock newBlock = new IntBlock(); + currentIntBlock.next = newBlock; + newBlock.next = null; + newBlock.buffer = new int[65536]; + newBlock.offset = currentIntBlock.offset+currentIntBuffer.length; + currentIntBlock = newBlock; + } else + currentIntBlock = currentIntBlock.next; + currentIntOffset = currentIntBlock.offset; + currentIntBuffer = currentIntBlock.buffer; + currentIntUpto = 0; + currentIntLimit = currentIntBuffer.length; + } + + void resetIntBlocks() { + currentIntBlock = intBlockStart; + currentIntOffset = currentIntBlock.offset; + currentIntBuffer = currentIntBlock.buffer; + currentIntUpto = 0; + currentIntLimit = currentIntBuffer.length; + } + + void initIntSlice(Posting p, final int size) { + if (currentIntUpto + size > currentIntLimit) + nextIntBlock(); + p.buffer = currentIntBuffer; + p.limit = currentIntUpto+size-1; + p.startIndex = currentIntOffset+currentIntUpto; + currentIntUpto += size; + //System.out.println(" init int slice: startIndex=" + p.startIndex); + } + + final class Posting { // info about a Term in a doc + char[] text; + int textStart; + int hashCode; + int startIndex; // Location of first int slice + int[] buffer; // Current int slice + int limit; // End point (buffer[limit]) of current int slice + Payload[] payloads; + + // ONLY USE FOR DEBUGGING! + public String getText() { + int upto = textStart; + while(text[upto] != 0xffff) + upto++; + return new String(text, textStart, upto-textStart); + } + + boolean equals(String otherText) { + final int len = otherText.length(); + int pos = textStart; + int i=0; + for(;i localTextBuffer.length) + localTextBuffer = new char[(int)(1.25*tokenTextLen)]; + s.getChars(0, tokenTextLen, localTextBuffer, 0); + tokenText = localTextBuffer; + tokenTextOffset = 0; + //System.out.println(" addPosition: string=" + s + " pos=" + position + " offsetStart=" + offsetStart + " offsetEnd=" + offsetEnd); + } else { + tokenText = t; + tokenTextLen = token.termBufferLength(); + tokenTextOffset = token.termBufferOffset(); + //System.out.println(" addPosition: buffer=" + new String(tokenText, tokenTextOffset, tokenTextLen) + " pos=" + position + " offsetStart=" + offsetStart + " offsetEnd=" + offsetEnd); + } + + int code = 0; + for(int i=tokenTextLen-1;i>=0;i--) + code = (code*37) + tokenText[i]; + int hashPos = code & postingsHashMask; + //System.out.println(" hashCode=" + code); + + Posting p = postingsHash[hashPos]; + if (p != null && (code != p.hashCode || !p.equals(tokenText, tokenTextOffset, tokenTextLen))) { + int code2 = code|1; + do { + hashPos = code2 & postingsHashMask; + code2 += code|1; + p = postingsHash[hashPos]; + } while (p != null && (code != p.hashCode || !p.equals(tokenText, tokenTextOffset, tokenTextLen))); + } + + if (p != null) { // term seen before + + int[] buffer = p.buffer; + int limit = p.limit; + int upto = buffer[limit]>>>4; + int level = buffer[limit]&15; + final int freq = buffer[limit-1]; + + //System.out.println(" already seen: upto=" + upto + " level=" + level + " freq=" + buffer[p.limit-1]); + + if (storeOffsets) { + if (upto == limit-1) { + // Need more space + buffer[upto] = position; + //System.out.println(" make new space1 pre: upto=" + upto + " limit=" + p.limit); + upto = newIntSlice(p, level, 3); + //System.out.println(" make new space1 post: upto=" + upto + " limit=" + p.limit); + buffer = p.buffer; + limit = p.limit; + level = buffer[limit]; + buffer[upto] = offsetStart; + buffer[upto+1] = offsetEnd; + buffer[limit] = ((upto+2)<<4)+level; + } else if (upto == limit-2) { + // Need more space + buffer[upto] = position; + buffer[upto+1] = offsetStart; + //System.out.println(" make new space2 pre: upto=" + upto + " limit=" + p.limit); + upto = newIntSlice(p, level, 3); + //System.out.println(" make new space2 post: upto=" + upto + " limit=" + p.limit); + buffer = p.buffer; + limit = p.limit; + level = buffer[limit]; + buffer[upto] = offsetEnd; + buffer[limit] = ((upto+1)<<4)+level; + } else if (upto == limit-3) { + // Need more space + buffer[upto] = position; + buffer[upto+1] = offsetStart; + buffer[upto+2] = offsetEnd; + //System.out.println(" make new space3 pre: upto=" + upto + " limit=" + p.limit); + upto = newIntSlice(p, level, 3); + //System.out.println(" make new space3 post: upto=" + upto + " limit=" + p.limit); + buffer = p.buffer; + limit = p.limit; + level = buffer[limit]; + buffer[limit] = (upto<<4)+level; + } else { + //System.out.println(" no space needed: upto=" + upto + " limit=" + p.limit); + buffer[upto] = position; + buffer[upto+1] = offsetStart; + buffer[upto+2] = offsetEnd; + buffer[limit] = ((upto+3)<<4)+level; + } + } else { + if (upto == limit-1) { + // Need more space + //System.out.println(" make new space pre: upto=" + upto + " limit=" + p.limit); + buffer[upto] = position; + upto = newIntSlice(p, level, 1); + limit = p.limit; + buffer = p.buffer; + level = buffer[limit]; + buffer[limit] = (upto<<4)+level; + //System.out.println(" make new space post: upto=" + upto + " limit=" + p.limit); + } else { + //System.out.println(" write pos " + position + " to upto=" + upto + " vs limit=" + p.limit + " level=" + level); + buffer[upto] = position; + buffer[limit] = ((upto+1)<<4)+level; + } + } + + final Payload payload = token.getPayload(); + + if (payload != null) { + if (p.payloads == null) { + // lazily allocate payload array + p.payloads = new Payload[freq*2]; + } else if (p.payloads.length <= freq) { + Payload[] newPayloads = new Payload[freq * 2]; // grow payloads array + System.arraycopy(p.payloads, 0, newPayloads, 0, p.payloads.length); + p.payloads = newPayloads; + } + p.payloads[freq] = payload; + fieldInfo.storePayloads = true; + } + buffer[limit-1] = 1+freq; // update frequency + + // } + + } else { // word not seen before + + // System.out.println(" new posting"); + p = postingsArray[numPostings]; + if (p == null) + p = postingsArray[numPostings] = new Posting(); + numPostings++; + + final int textLen1 = 1+tokenTextLen; + + if (textLen1 > 1024) { + p.text = new char[textLen1]; + p.textStart = 0; + } else { + if (textLen1 + currentCharUpto > currentCharLimit) + newCharBlock(); + p.text = currentCharBuffer; + p.textStart = currentCharUpto; + currentCharUpto += textLen1; + } + System.arraycopy(tokenText, tokenTextOffset, p.text, p.textStart, tokenTextLen); + // System.out.println(" posting=" + p + " textStart=" + p.textStart + " text=" + p.text); + p.text[p.textStart+tokenTextLen] = 0xffff; + p.hashCode = code; + + postingsHash[hashPos] = p; + + if (numPostings == postingsArrayLimit) { + // Resize postings array + int newSize = postingsArrayLimit*2; + Posting[] newPostings = new Posting[newSize]; + System.arraycopy(postingsArray, 0, newPostings, 0, postingsArrayLimit); + postingsArray = newPostings; + postingsArrayLimit = newSize; + + // Resize hash + newSize = postingsHashSize*2; + postingsHashMask = newSize-1; + Posting[] newHash = new Posting[newSize]; + for(int i=0;i= 0; + if (newHash[hashPosx] != null) { + int codex2 = codex|1; + do { + hashPosx = codex2 & postingsHashMask; + codex2 += codex|1; + } while (newHash[hashPosx] != null); + } + newHash[hashPosx] = p0; + } + } + + postingsHash = newHash; + postingsHashSize = newSize; + } + + final int[] buffer; + final int limit; + if (storeOffsets) { + initIntSlice(p, 5); + buffer = p.buffer; + limit = p.limit; + //System.out.println(" has offset: initIntSlice(4): limit=" + p.limit); + buffer[limit-4] = position; + buffer[limit-3] = offsetStart; + buffer[limit-2] = offsetEnd; + } else { + initIntSlice(p, 3); + buffer = p.buffer; + limit = p.limit; + //System.out.println(" no offset: initIntSlice(2): limit=" + p.limit); + buffer[limit-2] = position; + // System.out.println(" write pos " + position + " to upto=" + p.limit); + } + // TODO: for low freq we could encode into + // buffer[limit] then upgrade to separate int once + // freq is high + buffer[limit-1] = 1; + buffer[limit] = (limit-1)<<4; + + final Payload payload = token.getPayload(); + if (payload != null) { + p.payloads = new Payload[1]; + p.payloads[0] = payload; + fieldInfo.storePayloads = true; + } else + p.payloads = null; + } + position++; + } + + private RAMSegment ramSegment; + private int numVectorFields; + + public int writeVInt(byte[] b, int i) { + int upto = 0; + while ((i & ~0x7F) != 0) { + b[upto++] = ((byte)((i & 0x7f) | 0x80)); + i >>>= 7; + } + b[upto++] = ((byte)i); + return upto; + } + + private final class IntSliceReader { + IntBlock next; + IntBlock intBlock; + int[] buffer; + int upto; + int limit; + int mod; + int level; + + public void init(int startIndex, int mod) { + // TODO: we could do binary search + // Seek to the starting intBlock + intBlock = intBlockStart; + buffer = intBlock.buffer; + next = intBlock.next; + while(intBlock.offset + intBlock.buffer.length <= startIndex) { + intBlock = next; + buffer = intBlock.buffer; + next = next.next; + } + level = 0; + this.mod = mod; + upto = startIndex-intBlock.offset; + limit = upto+mod+1; + //System.out.println(" reader.init: startIndex=" + startIndex + " upto=" + upto + " limit=" + limit); + } + + public int next() { + // System.out.println(" next upto=" + upto + " limit=" + limit + " buffer=" + buffer); + if (upto == limit) { + // Skip to our next slice + final int nextIndex = buffer[upto]; + //System.out.println(" seek to " + nextIndex + " vs end=" + (intBlock.offset+intBlock.buffer.length) + " fwd address=" + upto); + while(intBlock.offset + intBlock.buffer.length <= nextIndex) { + intBlock = next; + buffer = intBlock.buffer; + next = next.next; + } + + final int newSize; + + // TODO: tune + switch(level) { + case 0: + newSize = 2+mod*2; + level = 1; + break; + case 1: + newSize = 2+mod*4; + level = 2; + break; + case 2: + newSize = 2+mod*8; + level = 3; + break; + case 3: + newSize = 2+mod*16; + level = 4; + break; + case 4: + newSize = 2+mod*32; + level = 5; + break; + case 5: + newSize = 2+mod*64; + level = 6; + break; + case 6: + newSize = 2+mod*128; + level = 7; + break; + default: + newSize = 2+mod*256; + level = 8; + break; + } + upto = nextIndex-intBlock.offset; + limit = upto + newSize - 1; + // System.out.println(" skip to upto=" + upto + " limit=" + limit); + } + //System.out.println(" now return " + buffer[upto]); + return buffer[upto++]; + } + } + + IntSliceReader intSliceReader = new IntSliceReader(); + + /* + * Walk through all unique text tokens (Posting + * instances) found in this field and serialize them + * into a single RAM segment. + */ + private void addPostingsAndVectors(FieldData fp) + throws CorruptIndexException, IOException { + + final FieldInfo currentField = fp.fieldInfo; + + final Posting[] postings = postingsArray; + final int numTerms = numPostings; + final int fieldNumber = fp.fieldInfo.number; + final int postingsHashSize = postingsHash.length; + + final boolean doVectors = fp.doVectors; + final boolean doPositions = fp.doVectorPositions; + final boolean doOffsets = fp.doVectorOffsets; + Posting lastPosting = null; + + if (doVectors) { + if (numVectorFields == vectorFieldPointers.length) { + final int newSize = (int) (vectorFieldPointers.length*1.25); + vectorFieldPointers = MultiDocumentWriter.realloc(vectorFieldPointers, newSize); + vectorFieldNumbers = MultiDocumentWriter.realloc(vectorFieldNumbers, newSize); + } + vectorFieldNumbers[numVectorFields] = fieldNumber; + vectorFieldPointers[numVectorFields++] = tvfLocal.getFilePointer(); + tvfLocal.writeVInt(numTerms); + byte bits = 0x0; + if (doPositions) + bits |= TermVectorsWriter.STORE_POSITIONS_WITH_TERMVECTOR; + if (doOffsets) + bits |= TermVectorsWriter.STORE_OFFSET_WITH_TERMVECTOR; + tvfLocal.writeByte(bits); + } + + final int intBlockMod = doOffsets ? 3:1; + + // System.out.println("add postings"); + + for(int i=0;i result[1]) { + final int t = result[0]; + result[0] = result[1]; + result[1] = t; + } + } else + // TODO: maybe radix sort here? + Arrays.sort(result, 0, num); + + return num; + } + + public boolean next() throws IOException { + + // ASSERT + long pos = in.getFilePointer(); + + final int start = in.readVInt(); + if (start == Integer.MAX_VALUE) + return false; + + //System.out.println(" next: idx=" + idx + " start=" + start + " textLength=" + textLength + " pos=" + pos + " buffer=" + ((RAMReader) in).buffer); + assert start <= textLength; + final int length = in.readVInt(); + textLength = start + length; + if (textLength > textBuffer.length) { + char[] newTextBuffer = new char[(int) (textLength*1.5)]; + System.arraycopy(textBuffer, 0, newTextBuffer, 0, start); + textBuffer = newTextBuffer; + } + in.readChars(textBuffer, start, length); + //System.out.println(" next term=" + new String(textBuffer, 0, textLength) + " len=" + textLength + " field=" + fieldNumber); + + fieldNumber = in.readVInt(); + df = in.readVInt(); + if (proxSizeIsVariable) + proxSize = in.readVLong(); + else + proxSize = in.readInt(); + //System.out.println(" next: df=" + df + " proxSize=" + proxSize + " after term pos=" + in.getFilePointer()); + + hashCode = 0; + for(int i=textLength-1;i>=0;i--) + hashCode = (hashCode * 37) + textBuffer[i]; + hashCode += fieldNumber; + + return true; + } + + public void flushFreq() throws IOException { + // We can't copyBytes with freq because we must + // re-encode the deltas between docIDs. + int docID = 0; + for(int i=0;i>> 1; + assert docID <= maxDocID; + assert docID > lastDocID || 0==lastDocID; + final int newCode = (docID-lastDocID)<<1; + lastDocID = docID; + if ((code & 1) != 0) + out.writeVInt(newCode|1); + else { + out.writeVInt(newCode); + out.writeVInt(in.readVInt()); + } + } + } + + public void close() throws IOException { + MultiDocumentWriter.close(in, in2); + } + + public boolean equals(SegmentMergeInfo other) { + if (other.fieldNumber == fieldNumber && + other.textLength == textLength) { + final char[] textA = textBuffer; + final char[] textB = other.textBuffer; + for(int i=0;i 0) { + + final SegmentMergeInfo smi = queue.pop(); + + long freqPointer = freqOutput.getFilePointer(); + long proxPointer = proxOutput.getFilePointer(); + + final int fieldNumber = smi.fieldNumber; + + if (currentField != fieldNumber) { + currentField = fieldNumber; + currentFieldName = fieldInfos.fieldName(fieldNumber); + currentFieldStorePayloads = fieldInfos.fieldInfo(fieldNumber).storePayloads; + } + + // TODO: can we avoid cons'ing here? + Term term = new Term(currentFieldName, new String(smi.textBuffer, 0, smi.textLength)); + + df = 0; + lastDoc = 0; + lastPayloadLength = -1; + lastDocID = 0; + + // System.out.println(" " + term); + resetSkip(); + + final int numToMerge = smi.sort(mergeIDXArray); + + for(int j=0;j>> 1; + + // System.out.println(" doc=" + doc + " lastDoc=" + lastDoc + " df=" + df); + + assert doc <= maxDocID; + + //if (!(doc > lastDoc || df == 1)) + //System.out.println("doc=" + doc + " lastDoc=" + lastDoc + " df=" + df); + + assert doc > lastDoc || df == 1; + + final int termDocFreq; + final int newDocCode = (doc-lastDoc)<<1; + lastDoc = doc; + + if ((docCode & 1) != 0) { + freqOutput.writeVInt(newDocCode|1); + termDocFreq = 1; + //System.out.println(" doc " + doc + " freq 1"); + //System.out.println(" write " + (newDocCode|1)); + } else { + freqOutput.writeVInt(newDocCode); + termDocFreq = freq.readVInt(); + //System.out.println(" doc " + doc + " freq " + termDocFreq); + //System.out.println(" write " + newDocCode + " then " + termDocFreq); + freqOutput.writeVInt(termDocFreq); + } + //System.out.println(" #pos=" + termDocFreq); + + /** See {@link DocumentWriter#writePostings(Posting[], String) for + * documentation about the encoding of positions and payloads + */ + for(int j=0;j DocSkip, FreqSkip, ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // DocSkip records the document number before every SkipInterval th document in TermFreqs. + // Document numbers are represented as differences from the previous value in the sequence. + // Case 2: current field stores payloads + // SkipDatum --> DocSkip, PayloadLength?, FreqSkip,ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // PayloadLength --> VInt + // In this case DocSkip/2 is the difference between + // the current and the previous value. If DocSkip + // is odd, then a PayloadLength encoded as VInt follows, + // if DocSkip is even, then it is assumed that the + // current payload length equals the length at the previous + // skip point + + final int delta = doc - lastSkipDoc; + if (currentFieldStorePayloads) { + if (payloadLength == lastSkipPayloadLength) + // the current payload length equals the length at the previous skip point, + // so we don't store the length again + skipBuffer.writeVInt(delta << 1); + else { + // the payload length is different from the previous one. We shift the DocSkip, + // set the lowest bit and store the current payload length as VInt. + skipBuffer.writeVInt((delta << 1) + 1); + skipBuffer.writeVInt(payloadLength); + lastSkipPayloadLength = payloadLength; + } + } else + // current field does not store payloads + skipBuffer.writeVInt(delta); + + long freqPointer = freqOutput.getFilePointer(); + long proxPointer = proxOutput.getFilePointer(); + skipBuffer.writeVInt((int) (freqPointer - lastSkipFreqPointer)); + skipBuffer.writeVInt((int) (proxPointer - lastSkipProxPointer)); + lastSkipFreqPointer = freqPointer; + lastSkipProxPointer = proxPointer; + + lastSkipDoc = doc; + } + + long writeSkip() throws IOException { + long skipPointer = freqOutput.getFilePointer(); + skipBuffer.writeTo(freqOutput); + return skipPointer; + } + + SegmentMergeInfo mergeInputs[] = new SegmentMergeInfo[0]; + int[] mergeIDXArray; + + final void resizeMergeInputs(final int minSize) { + if (mergeInputs.length < minSize) { + int size = (int) (minSize*1.25); + SegmentMergeInfo[] newMergeInputs = new SegmentMergeInfo[size]; + System.arraycopy(mergeInputs, 0, newMergeInputs, 0, mergeInputs.length); + for(int i=mergeInputs.length;i= MAX_WAIT_QUEUE) { + // System.out.println("do wait"); + + // There are too many thread states in line write + // to the index so we now pause to give them a + // chance to get scheduled by the JVM and finish + // their documents. Once we wake up again, a + // recycled ThreadState should be available else + // we wait again. + // System.out.println("w " + Thread.currentThread().getName()); + try { + wait(); + } catch (InterruptedException e) { + } + // System.out.println(" wd " + Thread.currentThread().getName()); + + } else { + // OK, just create a new thread state + state = new ThreadState(); + numThreadState++; + break; + } + } else { + // Use recycled thread state + state = (ThreadState) freeThreadStates.get(size-1); + freeThreadStates.remove(size-1); + break; + } + } + + boolean success = false; + try { + state.init(doc, docID++); + success = true; + } finally { + if (!success) + freeThreadStates.add(state); + } + + return state; + } + + void addDocument(Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + + // First pass: go through all fields in doc, updating + // shared FieldInfos and writing any stored fields: + final ThreadState state = getThreadState(doc); + boolean success = false; + try { + state.processDocument(analyzer); + success = true; + } finally { + if (success) + finishDocument(state); + else { + // nocommit: need to do some cleanup of the thread state? + freeThreadStates.add(state); + } + } + } + + long netMerge0Time; + long netMerge1Time; + long netFlushedMergeTime; + long netDocTime; + long netFlushTime; + long netSegmentTime; + + /* + * Does the synchronized work to finish/flush the inverted document. + */ + private synchronized void finishDocument(ThreadState state) throws IOException { + + // Now write the indexed document to the real files. + if (nextWriteDocID == state.docID) { + // It's my turn, so write everything now: + try { + writeDocument(state); + } finally { + nextWriteDocID++; + // Recycle our thread state back in the free pool + freeThreadStates.add(state); + } + + // If any states were waiting on me, sweep through and + // flush those that are enabled by my write. + boolean doNotify = numWaiting >= MAX_WAIT_QUEUE || flushPending; + if (numWaiting > 0) { + while(true) { + int upto = 0; + for(int i=0;i 0) { + tvd.writeVInt(state.numVectorFields); + for(int i=0;i ramBufferSize/14) { + long t0 = System.currentTimeMillis(); + mergeRAMSegments(state, 0); + netMerge0Time += (System.currentTimeMillis()-t0); + if (levelSizes[1] > ramBufferSize/7 && level0Compression < 0.7) { + t0 = System.currentTimeMillis(); + mergeRAMSegments(state, 1); + netMerge1Time += (System.currentTimeMillis()-t0); + } + } + + if (doSelfFlush && totalSize > ramBufferSize) { + long t0 = System.currentTimeMillis(); + flushRAMSegments(state); + netFlushTime += (System.currentTimeMillis()-t0); + } + } + + // For debugging + public void printAlloc(String prefix, RAMCell head, int limit) { + RAMCell c = head; + System.out.print(prefix + ":"); + if (c == null) + System.out.println(" null"); + else + while(c != null) { + if (c.next == null) { + System.out.println(" " + c.buffer.length + "(" + limit + ")"); + break; + } else { + System.out.print(" " + c.buffer.length); + c = c.next; + } + } + } + + long getRAMUsed() { + return totalSize; + } + + private final String tempFileName(int count) { + return segment + "x" + count + "." + IndexFileNames.TEMP_FLUSH_EXTENSION; + } + + long netDocSize; + int netDocCount; + + // Called when RAM buffer is full; we now merge all RAM + // segments to a single flushed segment: + final synchronized void flushRAMSegments(ThreadState state) throws IOException { + + if (infoStream != null) { + String name = tempFileName(flushedCount); + infoStream.println("\n" + getElapsedTime() + ": flush ram segments at docID " + docID + ", to " + name.substring(0, name.length()-4) + ": totalRam=" + (totalSize/1024/1024) + " MB"); + } + System.out.println("FLUSH TEMP @ docID=" + docID + " numDoc=" + (docID-lastFlushDocID) + "; RAM=" + totalSize); + System.out.println(" mem now: " + bean.getHeapMemoryUsage().getUsed()); + System.out.println(" avg doc=" + (netDocSize/netDocCount) + " bytes"); + lastFlushDocID = docID; + + // nocommit + netDocSize = 0; + netDocCount = 0; + + IndexOutput out = directory.createOutput(tempFileName(flushedCount)); + + final int numSegmentsIn = ramSegments.size(); + long newSize; + long oldSize = totalSize; + + state.resizeMergeInputs(numSegmentsIn); + + int numDoc = 0; + for(int i=0;i=0;i--) + System.out.println(" level " + i + ": count=" + flushedLevelCounts[i]); + + files = null; + + // Must "register" our newly created files with the + // deleter so that when they later decref they get + // deleted: + synchronized(writer) { + writer.getDeleter().checkpoint(writer.segmentInfos, false); + } + + // Maybe cascade merges. We do slightly different + // policy than normal segment merges: we let 20 level 0 + // segments accumulate first, then we merge the first 10 + // into a level 1 segment. After another 10 level 0 + // segments we merge the first 10 level 0's into another + // level 1, etc. This better "spreads" / "postpones" + // the merge work so we don't pay a massive wasted merge + // price only to find it's time to flush a real segment. + int mergeLevel = 0; + while(mergeLevel < flushedLevelCounts.length && flushedLevelCounts[mergeLevel] == 2*flushedMergeFactor) + mergeFlushedSegments(state, mergeLevel++); + } + + // Merge flushed segments into a single new flushed segment + final void mergeFlushedSegments(ThreadState state, int level) throws IOException { + + int start = 0; + int end = 0; + for(int i=flushedLevelCounts.length-1;i>=level;i--) { + start = end; + end += flushedLevelCounts[i]; + } + + if (end-start > flushedMergeFactor) + end = start+flushedMergeFactor; + + if (infoStream != null) + infoStream.println("merge flushed segments: level " + level); + + mergeFlushedSegments(state, start, end, level); + } + + final void mergeFlushedSegments(ThreadState state, int start, int end, int level) throws IOException { + long t0 = System.currentTimeMillis(); + if (infoStream != null) { + String name = tempFileName(flushedCount); + infoStream.println("merge flushed segments to " + name.substring(0, name.length()-4) + ": start " + start + " to end " + end); + } + + long newSize; + int numDoc; + long oldSize; + + FlushedSegment newSegment; + + if (1 == end-start) { + // Degenerate case + newSegment = (FlushedSegment) flushedSegments.get(start); + numDoc = newSegment.numDoc; + oldSize = newSize = newSegment.size; + } else { + + // maybe reallocate + state.resizeMergeInputs(end-start); + numDoc = 0; + oldSize = 0; + IndexOutput out = directory.createOutput(tempFileName(flushedCount)); + + try { + int upto = 0; + for (int i=start;i start; i--) // remove old infos & add new + flushedSegments.remove(i); + + newSegment = new FlushedSegment(numDoc, flushedCount++, newSize); + flushedSegments.set(start, newSegment); + } + + + if (level != -1) { + if (flushedLevelSizes.length == level+1) { + flushedLevelSizes = realloc(flushedLevelSizes, 1+flushedLevelSizes.length); + flushedLevelCounts = realloc(flushedLevelCounts, 1+flushedLevelCounts.length); + } + + flushedLevelSizes[level] -= oldSize; + flushedLevelSizes[1+level] += newSize; + + flushedLevelCounts[level] -= (end-start); + flushedLevelCounts[1+level]++; + } + + totalFlushedSize += newSize - oldSize; + + if (infoStream != null) { + infoStream.println(" done: oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalFlushed=" + (totalFlushedSize/1024/1024) + " MB"); + if (level != -1) + for(int i=flushedLevelCounts.length-1;i>=0;i--) + System.out.println(" level " + i + ": count=" + flushedLevelCounts[i]); + } + + files = null; + + // nocommit: should I just give IFD list of files to delete? + // Have deleter remove our now unreferenced files: + synchronized(writer) { + writer.getDeleter().checkpoint(writer.segmentInfos, false); + } + netFlushedMergeTime += System.currentTimeMillis()-t0; + } + + static void close(IndexOutput f0, IndexOutput f1) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep != null) throw keep; + } + } + } + + static void close(IndexInput f0, IndexInput f1) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep != null) throw keep; + } + } + } + + static void close(IndexOutput freq, IndexOutput prox, TermInfosWriter terms) throws IOException { + IOException keep = null; + try { + if (freq != null) freq.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (prox != null) prox.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (terms != null) terms.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + float level0Compression; + + NumberFormat nf = NumberFormat.getInstance(); + String getElapsedTime() { + long t = System.currentTimeMillis(); + nf.setMaximumFractionDigits(1); + nf.setMinimumFractionDigits(1); + return nf.format((t-startTime)/1000.0) + " sec"; + } + + // In-memory merge: reads multiple ram segments (in the + // modified format) and replaces with a single ram segment. + final void mergeRAMSegments(ThreadState state, int level) throws IOException { + + int start = 0; + int end = 0; + for(int i=levelCounts.length-1;i>=level;i--) { + start = end; + end += levelCounts[i]; + } + + if (infoStream != null) { + infoStream.println("\n" + getElapsedTime() + ": merge ram segments: level " + level + ": start idx " + start + " to end idx " + end + " docID=" + docID); + System.out.println(" RAM: " + totalSize); + } + long oldSize; + //long oldTermsSize; + //long oldFreqSize; + //long oldProxSize; + long newSize; + + int numDoc; + RAMSegment newRAMSegment; + + if (end == start+1) { + // Degenerate case, if suddenly an immense document + // comes through + newRAMSegment = (RAMSegment) ramSegments.get(start); + //oldTermsSize = newRAMSegment.terms.size; + //oldFreqSize = newRAMSegment.freq.size; + //oldProxSize = newRAMSegment.prox.size; + newSize = oldSize = newRAMSegment.size; + numDoc = newRAMSegment.numDoc; + } else { + + state.resizeMergeInputs(end-start); + final int numSegmentsIn = end-start; + + oldSize = 0; + //oldTermsSize = 0; + //oldFreqSize = 0; + //oldProxSize = 0; + int upto = 0; + numDoc = 0; + for(int i=start;i start; i--) { // remove old infos & add new + RAMSegment rs = (RAMSegment) ramSegments.get(i); + ramSegments.remove(i); + } + + newRAMSegment = new RAMSegment(numDoc, state.out); + newSize = newRAMSegment.size; + ramSegments.set(start, newRAMSegment); + } + + if (levelSizes.length == level+1) { + levelSizes = realloc(levelSizes, 1+levelSizes.length); + levelCounts = realloc(levelCounts, 1+levelCounts.length); + } + + levelSizes[level] -= oldSize; + levelSizes[1+level] += newSize; + + levelCounts[level] -= (end-start); + levelCounts[1+level]++; + + totalSize += newSize - oldSize; + if (0 == level) + level0Compression = ((float) newSize)/oldSize; + + if (infoStream != null) { + infoStream.println(" oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalRAM=" + (totalSize/1024/1024) + " MB"); + //infoStream.println(" termsSize=" + termsOut.size + " freqSize=" + freqOut.size + " proxSize=" + proxOut.size); + //infoStream.println(" oldTermsSize=" + oldTermsSize + " oldFreqSize=" + oldFreqSize + " oldProxSize=" + oldProxSize); + } + } + + final void createCompoundFile(String segment) + throws IOException { + + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + ".cfs"); + + // Basic files + for (int i = 0; i < IndexFileNames.COMPOUND_EXTENSIONS.length; i++) + cfsWriter.addFile(segment + "." + IndexFileNames.COMPOUND_EXTENSIONS[i]); + + // Fieldable norm files + if (flushedNorms) + cfsWriter.addFile(segment + "." + IndexFileNames.NORMS_EXTENSION); + + // Vector files + if (flushedVectors) + for (int i = 0; i < IndexFileNames.VECTOR_EXTENSIONS.length; i++) + cfsWriter.addFile(segment + "." + IndexFileNames.VECTOR_EXTENSIONS[i]); + + // Perform the merge + cfsWriter.close(); + } + + private void writeNorms() throws IOException { + IndexOutput output = null; + try { + final int numField = fieldInfos.size(); + for (int i=0;i 0; + + if (srcIn instanceof RAMReader) { + RAMReader src = (RAMReader) srcIn; + while(true) { + final int chunk = src.limit - src.upto; + if (chunk < numBytes) { + // Src is the limit + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + numBytes -= chunk; + } else if (chunk == numBytes) { + // Matched + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + break; + } else { + // numBytes is the limit + destIn.writeBytes(src.buffer, src.upto, (int) numBytes); + src.upto += numBytes; + break; + } + } + } else { + // Use intermediate buffer + while(numBytes > 0) { + final int chunk; + if (numBytes > 1024) { + chunk = 1024; + } else { + chunk = (int) numBytes; + } + srcIn.readBytes(byteBuffer, 0, chunk); + destIn.writeBytes(byteBuffer, chunk); + numBytes -= chunk; + } + } + } + + /** If non-null, a message will be printed to this if maxFieldLength is reached. + */ + void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + // nocommit + // this.infoStream = System.out; + } + + private class RAMSegment { + int numDoc; + RAMCell data; + int dataLimit; + long size; + public RAMSegment(int numDoc, RAMWriter out) { + this.numDoc = numDoc; + size = out.size; + + this.data = out.head; + this.dataLimit = out.upto; + out.reset(); + } + } + + private class FlushedSegment { + int numDoc; + int segment; + long size; + public FlushedSegment(int numDoc, int segment, long size) { + this.numDoc = numDoc; + this.segment = segment; + this.size = size; + } + } + + final static int MAX_RAM_CELL_LEVEL = 4; + RAMCell freeCells[] = new RAMCell[1+MAX_RAM_CELL_LEVEL]; + + synchronized void recycle(RAMCell cell) { + cell.next = freeCells[cell.level]; + freeCells[cell.level] = cell; + } + + public RAMCell alloc(final int level, final int subLevel) { + RAMCell r; + synchronized(this) { + r = freeCells[level]; + if (r != null) + freeCells[level] = r.next; + else + r = null; + } + + if (r == null) + r = new RAMCell(level, subLevel); + else { + r.next = null; + r.subLevel = (byte) subLevel; + } + return r; + } + + private static final class RAMCell { + + byte[] buffer; + RAMCell next; + byte level; + byte subLevel; + + public RAMCell(final int level, final int subLevel) { + this.level = (byte) level; + this.subLevel = (byte) subLevel; + int size = 0; + switch(this.level) { + case 0: + size = 64; + break; + case 1: + size = 256; + break; + case 2: + size = 1024; + break; + case 3: + size = 4096; + break; + case 4: + size = 16384; + break; + } + buffer = new byte[size]; + } + } + + private final class RAMWriter extends IndexOutput { + + RAMCell head; + RAMCell tail; + int upto; + int limit; + byte[] buffer; + long size; + + boolean isFree = false; + + void setStartLevel(int level) { + assert head == null; + head = tail = alloc(level, 0); + buffer = head.buffer; + upto = 0; + limit = head.buffer.length; + size = limit; + } + + public void writeByte(byte b) { + assert !isFree; + if (upto == limit) + nextBuffer(); + + buffer[upto++] = b; + } + + // Move all of our bytes to out and reset + public void writeTo(IndexOutput out) throws IOException { + assert !isFree; + while(head != null) { + final int numBytes; + if (head.next == null) + numBytes = upto; + else + numBytes = head.buffer.length; + out.writeBytes(head.buffer, numBytes); + RAMCell next = head.next; + recycle(head); + head = next; + } + reset(); + } + + private void reset() { + assert !isFree; + head = tail = null; + buffer = null; + limit = upto = 0; + size = 0; + } + + private void free() { + assert !isFree; + while(head != null) { + RAMCell c = head.next; + recycle(head); + head = c; + } + reset(); + } + + // Write an int at a specific spot: + public void writeInt(RAMCell cell, int upto, int v) { + byte[] buffer = cell.buffer; + int limit = buffer.length; + if (upto == limit) { + buffer = cell.next.buffer; + limit = buffer.length; + upto = 0; + } + assert 0 == buffer[upto]; + buffer[upto++] = (byte) (v >> 24); + if (upto == limit) { + buffer = cell.next.buffer; + limit = buffer.length; + upto = 0; + } + assert 0 == buffer[upto]; + buffer[upto++] = (byte) (v >> 16); + if (upto == limit) { + buffer = cell.next.buffer; + limit = buffer.length; + upto = 0; + } + assert 0 == buffer[upto]; + buffer[upto++] = (byte) (v >> 8); + if (upto == limit) { + buffer = cell.next.buffer; + limit = buffer.length; + upto = 0; + } + assert 0 == buffer[upto]; + buffer[upto++] = (byte) v; + } + + public void writeBytes(byte[] b, int offset, int numBytes) { + assert !isFree; + assert numBytes > 0; + switch(numBytes) { + case 4: + writeByte(b[offset++]); + case 3: + writeByte(b[offset++]); + case 2: + writeByte(b[offset++]); + case 1: + writeByte(b[offset++]); + break; + default: + if (upto == limit) + nextBuffer(); + // System.out.println(" writeBytes: buffer=" + buffer + " head=" + head + " offset=" + offset + " nB=" + numBytes); + while(true) { + int chunk = limit - upto; + if (chunk >= numBytes) { + System.arraycopy(b, offset, buffer, upto, numBytes); + upto += numBytes; + break; + } else { + System.arraycopy(b, offset, buffer, upto, chunk); + offset += chunk; + numBytes -= chunk; + nextBuffer(); + } + } + } + } + + public void nextBuffer() { + assert !isFree; + + final int level; + final int subLevel; + if (tail == null) { + level = 0; + subLevel = 0; + } else if (tail.level < MAX_RAM_CELL_LEVEL) { + if (7 == tail.subLevel) { + level = 1+tail.level; + subLevel = 0; + } else { + level = tail.level; + subLevel = 1+tail.subLevel; + } + } else { + subLevel = 0; + level = MAX_RAM_CELL_LEVEL; + } + + RAMCell c = alloc(level, subLevel); + + if (head == null) + head = tail = c; + else { + tail.next = c; + tail = c; + } + + limit = c.buffer.length; + size += limit; + buffer = c.buffer; + + upto = 0; + } + + public long getFilePointer() { + assert !isFree; + return size - (limit-upto); + } + + public long length() { + assert !isFree; + return getFilePointer(); + } + + public void close() {} + public void flush() {throw new RuntimeException("not implemented");} + public void seek(long pos) {throw new RuntimeException("not implemented");} + } + + // Limited IndexInput for "read once". This frees each + // buffer from the head once it's been read. + private final class RAMReader extends IndexInput { + + boolean doFreeBuffers; + int readLimit; + int upto; + int limit; + RAMCell head; + byte[] buffer; + long pos; + + // ASSERT + boolean finished = true; + + RAMReader(RAMCell head, int limit) { + reset(head, limit); + } + + public void reset(RAMCell head, int limit) { + // Make sure we were fully read + assert finished; + doFreeBuffers = true; + finished = false; + readLimit = limit; + this.head = head; + upto = 0; + pos = 0; + + if (head == null) { + assert 0 == readLimit; + buffer = null; + } else { + buffer = head.buffer; + if (head.next == null) { + this.limit = readLimit; + assert this.limit > 0 && this.limit <= buffer.length; + } else + this.limit = buffer.length; + } + } + + public byte readByte() { + byte b = buffer[upto++]; + if (upto == limit) + nextBuffer(); + return b; + } + + public void nextBuffer() { + RAMCell c = head.next; + pos += limit; + if (doFreeBuffers) + recycle(head); + head = c; + upto = 0; + if (head != null) { + buffer = head.buffer; + if (head.next == null) { + limit = readLimit; + assert limit > 0 && limit <= buffer.length; + } else + limit = buffer.length; + } else { + // ASSERT + finished = true; + buffer = null; + } + } + + public void seek(long toPos) { + assert toPos > (pos+upto); + while(true) { + if (pos+limit > toPos) { + // Seek within current buffer + upto = (int) (toPos-pos); + break; + } else + nextBuffer(); + } + } + + public long getFilePointer() { + return pos+upto; + } + + public void readBytes(byte[] b, int offset, int len) {throw new RuntimeException("not implemented");} + public void close() {} + public long length() {throw new RuntimeException("not implemented");} + } + + static final byte defaultNorm = Similarity.encodeNorm(1.0f); + + private class BufferedNorms { + + RAMWriter out = new RAMWriter(); + int upto; + + void add(float norm) { + byte b = Similarity.encodeNorm(norm); + out.writeByte(b); + upto++; + } + + void fill(int docID) { + // System.out.println(" now fill: " + upto + " vs " + docID); + while(upto < docID) { + // fill in docs that didn't have this field: + out.writeByte(defaultNorm); + upto++; + } + } + } + + static long[] realloc(long[] array, int newSize) { + long[] newArray = new long[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static int[] realloc(int[] array, int newSize) { + int[] newArray = new int[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } +} Property changes on: src/java/org/apache/lucene/index/MultiDocumentWriter.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/SegmentMergeInfo.java =================================================================== --- src/java/org/apache/lucene/index/SegmentMergeInfo.java (revision 523296) +++ src/java/org/apache/lucene/index/SegmentMergeInfo.java (working copy) @@ -73,9 +73,8 @@ final void close() throws IOException { termEnum.close(); - if (postings != null) { - postings.close(); + if (postings != null) + postings.close(); } } -} Index: src/java/org/apache/lucene/store/IndexOutput.java =================================================================== --- src/java/org/apache/lucene/store/IndexOutput.java (revision 523296) +++ src/java/org/apache/lucene/store/IndexOutput.java (working copy) @@ -125,6 +125,24 @@ } } + public void writeChars(char[] s, int start, int length) + throws IOException { + final int end = start + length; + for (int i = start; i < end; i++) { + final int code = (int)s[i]; + if (code >= 0x01 && code <= 0x7F) + writeByte((byte)code); + else if (((code >= 0x80) && (code <= 0x7FF)) || code == 0) { + writeByte((byte)(0xC0 | (code >> 6))); + writeByte((byte)(0x80 | (code & 0x3F))); + } else { + writeByte((byte)(0xE0 | (code >>> 12))); + writeByte((byte)(0x80 | ((code >> 6) & 0x3F))); + writeByte((byte)(0x80 | (code & 0x3F))); + } + } + } + /** Forces any buffered output to be written. */ public abstract void flush() throws IOException; Index: src/demo/org/apache/lucene/demo/IndexLineFiles.java =================================================================== --- src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) +++ src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) @@ -0,0 +1,193 @@ +package org.apache.lucene.demo; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.SimpleSpaceAnalyzer; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.document.DateTools; + +import java.io.File; +import java.io.FileReader; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Date; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Index all text files under a directory. */ +public class IndexLineFiles { + + private IndexLineFiles() {} + + static final File INDEX_DIR = new File("index"); + + static final AtomicInteger allCount = new AtomicInteger(); + + static int bufferSize; + static String fileName; + + private static class Indexer extends Thread { + + public void run() { + int iter = 0; + Document doc = new Document(); + while (true) { + + try { + BufferedReader input = new BufferedReader(new FileReader(fileName)); + String line = null; + + try { + while ((line = input.readLine()) != null) { + + if (doStoredFields && 0 == iter) { + // Add the path of the file as a field named "path". Use a field that is + // indexed (i.e. searchable), but don't tokenize the field into words. + doc.add(new Field("path", fileName, Field.Store.YES, Field.Index.NO)); + + // Add the last modified date of the file a field named "modified". Use + // a field that is indexed (i.e. searchable), but don't tokenize the field + // into words. + doc.add(new Field("modified", + "200703161637", + Field.Store.YES, Field.Index.NO)); + } + + doc.add(new Field("contents", line, Field.Store.NO, Field.Index.TOKENIZED, tvMode)); + + if (++iter == mult) { + writer.addDocument(doc); + doc.getFields().clear(); + iter = 0; + if (allCount.getAndIncrement() >= (numDoc-1)) { + System.out.println("THREAD DONE"); + return; + } + } + } + } finally { + input.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + static Field.TermVector tvMode; + static boolean doStoredFields; + static int mult; + static IndexWriter writer; + static int numDoc; + + /** Index all text files under a directory. */ + public static void main(String[] args) throws IOException { + String usage = "java org.apache.lucene.demo.IndexFiles "; + + if (args.length == 0) { + System.err.println("Usage: " + usage); + System.exit(1); + } + + fileName = args[0]; + boolean autoCommit = args[1].equals("yes"); + bufferSize = Integer.parseInt(args[2]); + numDoc = Integer.parseInt(args[3]); + int maxBufferedDocs = Integer.parseInt(args[4]); + boolean optimize = args[5].equals("yes"); + + if (args[6].equals("no")) + tvMode = Field.TermVector.NO; + else if (args[6].equals("yes")) + tvMode = Field.TermVector.YES; + else if (args[6].equals("pos")) + tvMode = Field.TermVector.WITH_POSITIONS; + else if (args[6].equals("posoffs")) + tvMode = Field.TermVector.WITH_POSITIONS_OFFSETS; + else + throw new RuntimeException("bad term vector mode: " + args[6]); + + doStoredFields = args[7].equals("yes"); + mult = Integer.parseInt(args[8]); + int numThread = Integer.parseInt(args[9]); + int mergeFactor = Integer.parseInt(args[10]); + + System.out.println("\nFAST BLOCK: autoCommit=" + autoCommit + " bufferSize=" + bufferSize + "MB docLimit=" + numDoc + " optimize=" + optimize + " termVectors=" + args[6] + " storedFields=" + doStoredFields + " multiplier=" + mult + " numThread=" + numThread + " mergeFactor=" + mergeFactor); + System.out.println(" NO MERGING"); + + if (INDEX_DIR.exists()) { + System.out.println("Cannot save index to '" +INDEX_DIR+ "' directory, please delete it first"); + System.exit(1); + } + + Date start = new Date(); + try { + // IndexWriter writer = new IndexWriter(INDEX_DIR, new StandardAnalyzer(), true); + writer = new IndexWriter(FSDirectory.getDirectory(INDEX_DIR), autoCommit, new SimpleSpaceAnalyzer(), true); + //writer = new IndexWriter(FSDirectory.getDirectory(INDEX_DIR), autoCommit, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(maxBufferedDocs); + writer.setMaxFieldLength(100000000); + writer.setRAMBufferSizeMB(bufferSize); + writer.setUseCompoundFile(false); + writer.setInfoStream(System.out); + writer.setMergeFactor(mergeFactor); + // writer.setMaxFieldLength(10000000); + //writer.setMaxFieldLength(1000); + + Indexer[] indexers = new Indexer[numThread]; + for(int i=0;i