Index: src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (working copy) @@ -40,7 +40,7 @@ for (int i = 0; i < 100; i++) { addDoc(writer); checkInvariants(writer); - if (writer.getRamSegmentCount() + writer.getSegmentCount() >= 18) { + if (writer.getSegmentCount() + writer.getSegmentCount() >= 18) { noOverMerge = true; } } @@ -178,7 +178,7 @@ int mergeFactor = writer.getMergeFactor(); int maxMergeDocs = writer.getMaxMergeDocs(); - int ramSegmentCount = writer.getRamSegmentCount(); + int ramSegmentCount = writer.getNumBufferedDocuments(); assertTrue(ramSegmentCount < maxBufferedDocs); int lowerBound = -1; Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -93,7 +93,7 @@ } modifier.flush(); - assertEquals(0, modifier.getRamSegmentCount()); + assertEquals(0, modifier.getNumBufferedDocuments()); assertTrue(0 < modifier.getSegmentCount()); if (!autoCommit) { @@ -435,7 +435,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexReader.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexReader.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy) @@ -803,7 +803,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -461,7 +461,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); @@ -842,6 +842,7 @@ public void testCommitOnCloseAbort() throws IOException { Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for (int i = 0; i < 14; i++) { addDoc(writer); } @@ -854,6 +855,7 @@ searcher.close(); writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDoc(writer); } @@ -878,6 +880,7 @@ // Now make sure we can re-open the index, add docs, // and all is good: writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int i=0;i<12;i++) { for(int j=0;j<17;j++) { addDoc(writer); @@ -945,6 +948,7 @@ public void testCommitOnCloseOptimize() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDocWithIndex(writer, j); } Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 523296) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -74,8 +74,6 @@ count++; } - modifier.close(); - } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); @@ -125,6 +123,9 @@ IndexerThread indexerThread = new IndexerThread(modifier); indexerThread.start(); + IndexerThread indexerThread2 = new IndexerThread(modifier); + indexerThread2.start(); + // Two searchers that constantly just re-instantiate the searcher: SearcherThread searcherThread1 = new SearcherThread(directory); searcherThread1.start(); @@ -133,9 +134,14 @@ searcherThread2.start(); indexerThread.join(); + indexerThread2.join(); searcherThread1.join(); searcherThread2.join(); + + modifier.close(); + assertTrue("hit unexpected exception in indexer", !indexerThread.failed); + assertTrue("hit unexpected exception in indexer 2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); //System.out.println(" Writer: " + indexerThread.count + " iterations"); Index: src/test/org/apache/lucene/index/TestIndexFileDeleter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexFileDeleter.java (revision 523296) +++ src/test/org/apache/lucene/index/TestIndexFileDeleter.java (working copy) @@ -34,6 +34,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); int i; for(i=0;i<35;i++) { addDoc(writer, i); Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 523296) +++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy) @@ -254,6 +254,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -271,7 +272,7 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } // Simplistic check: just verify all segments_N's still @@ -316,6 +317,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -333,13 +335,15 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } - // Simplistic check: just verify the index is in fact - // readable: - IndexReader reader = IndexReader.open(dir); - reader.close(); + if (autoCommit) { + // Simplistic check: just verify the index is in fact + // readable: + IndexReader reader = IndexReader.open(dir); + reader.close(); + } dir.close(); } @@ -363,6 +367,7 @@ for(int j=0;j>> 1; // shift off low bit if ((docCode & 1) != 0) // if low bit is set freq = 1; // freq is one else freq = freqStream.readVInt(); // else read freq count++; + // //System.out.println(" read freq " + freq); if (deletedDocs == null || !deletedDocs.get(doc)) { + //System.out.println(" add " + doc + "; freq=" + freq); docs[i] = doc; freqs[i] = freq; ++i; @@ -156,7 +166,9 @@ /** Optimized implementation. */ public boolean skipTo(int target) throws IOException { + //System.out.println("std skip to " + target); if (df >= skipInterval) { // optimized case + //System.out.println(" is frequent enough"); if (skipStream == null) skipStream = (IndexInput) freqStream.clone(); // lazily clone @@ -172,6 +184,7 @@ long lastFreqPointer = freqStream.getFilePointer(); long lastProxPointer = -1; int numSkipped = -1 - (count % skipInterval); + //System.out.println(" target " + target + "; skipDoc " + skipDoc); while (target > skipDoc) { lastSkipDoc = skipDoc; @@ -203,11 +216,13 @@ freqPointer += skipStream.readVInt(); proxPointer += skipStream.readVInt(); + //System.out.println(" now freq " + freqPointer + " prox " + proxPointer); skipCount++; } // if we found something to skip, then skip it if (lastFreqPointer > freqStream.getFilePointer()) { + //System.out.println(" do skip! " + lastFreqPointer); freqStream.seek(lastFreqPointer); skipProx(lastProxPointer, lastPayloadLength); @@ -219,6 +234,7 @@ // done skipping, now just scan do { + //System.out.println(" now scan " + target + " " + doc); if (!next()) return false; } while (target > doc); Index: src/java/org/apache/lucene/index/FieldsWriter.java =================================================================== --- src/java/org/apache/lucene/index/FieldsWriter.java (revision 523296) +++ src/java/org/apache/lucene/index/FieldsWriter.java (working copy) @@ -38,17 +38,90 @@ private IndexOutput indexStream; + private boolean doClose; + FieldsWriter(Directory d, String segment, FieldInfos fn) throws IOException { fieldInfos = fn; fieldsStream = d.createOutput(segment + ".fdt"); indexStream = d.createOutput(segment + ".fdx"); + doClose = true; } + FieldsWriter(IndexOutput fdx, IndexOutput fdt, FieldInfos fn) throws IOException { + fieldInfos = fn; + fieldsStream = fdt; + indexStream = fdx; + doClose = false; + } + IndexOutput getIndexStream() { + return indexStream; + } + IndexOutput getFieldsStream() { + return fieldsStream; + } + final void close() throws IOException { + if (doClose) { fieldsStream.close(); indexStream.close(); + } } + final void writeField(FieldInfo fi, Fieldable field) throws IOException { + // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode + // and field.binaryValue() already returns the compressed value for a field + // with isCompressed()==true, so we disable compression in that case + boolean disableCompression = (field instanceof FieldsReader.FieldForMerge); + fieldsStream.writeVInt(fi.number); + // System.out.println(" write field number " + fieldInfos.fieldNumber(field.name()) + " name " + field.name() + " to " + fieldsStream + " at " + fieldsStream.getFilePointer()); + byte bits = 0; + if (field.isTokenized()) + bits |= FieldsWriter.FIELD_IS_TOKENIZED; + if (field.isBinary()) + bits |= FieldsWriter.FIELD_IS_BINARY; + if (field.isCompressed()) + bits |= FieldsWriter.FIELD_IS_COMPRESSED; + + fieldsStream.writeByte(bits); + + if (field.isCompressed()) { + // compression is enabled for the current field + byte[] data = null; + + if (disableCompression) { + // optimized case for merging, the data + // is already compressed + data = field.binaryValue(); + } else { + // check if it is a binary field + if (field.isBinary()) { + data = compress(field.binaryValue()); + } + else { + data = compress(field.stringValue().getBytes("UTF-8")); + } + } + final int len = data.length; + // System.out.println(" compressed: " + len); + fieldsStream.writeVInt(len); + fieldsStream.writeBytes(data, len); + } + else { + // compression is disabled for the current field + if (field.isBinary()) { + byte[] data = field.binaryValue(); + final int len = data.length; + // System.out.println(" not compressed: " + len); + fieldsStream.writeVInt(len); + fieldsStream.writeBytes(data, len); + } + else { + fieldsStream.writeString(field.stringValue()); + } + } + // System.out.println(" fieldsStream now at " + fieldsStream.getFilePointer()); + } + final void addDocument(Document doc) throws IOException { indexStream.writeLong(fieldsStream.getFilePointer()); @@ -59,62 +132,14 @@ if (field.isStored()) storedCount++; } + // System.out.println("write " + storedCount + " fields to " + fieldsStream + " at " + fieldsStream.getFilePointer()); fieldsStream.writeVInt(storedCount); fieldIterator = doc.getFields().iterator(); while (fieldIterator.hasNext()) { Fieldable field = (Fieldable) fieldIterator.next(); - // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode - // and field.binaryValue() already returns the compressed value for a field - // with isCompressed()==true, so we disable compression in that case - boolean disableCompression = (field instanceof FieldsReader.FieldForMerge); - if (field.isStored()) { - fieldsStream.writeVInt(fieldInfos.fieldNumber(field.name())); - - byte bits = 0; - if (field.isTokenized()) - bits |= FieldsWriter.FIELD_IS_TOKENIZED; - if (field.isBinary()) - bits |= FieldsWriter.FIELD_IS_BINARY; - if (field.isCompressed()) - bits |= FieldsWriter.FIELD_IS_COMPRESSED; - - fieldsStream.writeByte(bits); - - if (field.isCompressed()) { - // compression is enabled for the current field - byte[] data = null; - - if (disableCompression) { - // optimized case for merging, the data - // is already compressed - data = field.binaryValue(); - } else { - // check if it is a binary field - if (field.isBinary()) { - data = compress(field.binaryValue()); - } - else { - data = compress(field.stringValue().getBytes("UTF-8")); - } - } - final int len = data.length; - fieldsStream.writeVInt(len); - fieldsStream.writeBytes(data, len); - } - else { - // compression is disabled for the current field - if (field.isBinary()) { - byte[] data = field.binaryValue(); - final int len = data.length; - fieldsStream.writeVInt(len); - fieldsStream.writeBytes(data, len); - } - else { - fieldsStream.writeString(field.stringValue()); - } - } - } + if (field.isStored()) + writeField(fieldInfos.fieldInfo(field.name()), field); } } Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 523296) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -60,10 +60,11 @@ (which just deletes and then adds). When finished adding, deleting and updating documents, close should be called.

These changes are buffered in memory and periodically - flushed to the {@link Directory} (during the above method calls). A flush is triggered when there are - enough buffered deletes (see {@link - #setMaxBufferedDeleteTerms}) or enough added documents - (see {@link #setMaxBufferedDocs}) since the last flush, + flushed to the {@link Directory} (during the above method + calls). A flush is triggered when there are enough + buffered deletes (see {@link #setMaxBufferedDeleteTerms}) + or enough added documents (see {@link #setMaxBufferedDocs} + and {@link #setRAMBufferSizeMB}) since the last flush, whichever is sooner. When a flush occurs, both pending deletes and added documents are flushed to the index. A flush may also trigger one or more segment merges.

@@ -171,11 +172,17 @@ public final static int DEFAULT_MERGE_FACTOR = 10; /** - * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}. + * Default value is 0 (meaning flush is based on RAM usage + * by default). Change using {@link #setMaxBufferedDocs}. */ - public final static int DEFAULT_MAX_BUFFERED_DOCS = 10; + public final static int DEFAULT_MAX_BUFFERED_DOCS = 0; /** + * Default value is 16 MB. Change using {@link #setRAMBufferSizeMB}. + */ + public final static float DEFAULT_RAM_BUFFER_SIZE_MB = 16F; + + /** * Default value is 1000. Change using {@link #setMaxBufferedDeleteTerms(int)}. */ public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; @@ -208,8 +215,7 @@ private boolean autoCommit = true; // false if we should commit only on close SegmentInfos segmentInfos = new SegmentInfos(); // the segments - SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory - private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs + private MultiDocumentWriter docWriter; private IndexFileDeleter deleter; private Lock writeLock; @@ -563,6 +569,10 @@ } } + IndexFileDeleter getDeleter() { + return deleter; + } + private void init(Directory d, Analyzer a, final boolean create, boolean closeDir, IndexDeletionPolicy deletionPolicy, boolean autoCommit) throws CorruptIndexException, LockObtainFailedException, IOException { this.closeDir = closeDir; @@ -602,11 +612,14 @@ rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); } + docWriter = new MultiDocumentWriter(newSegmentName(), directory, this, !autoCommit); + docWriter.setInfoStream(infoStream); + // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: deleter = new IndexFileDeleter(directory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, - segmentInfos, infoStream); + segmentInfos, infoStream, docWriter); } catch (IOException e) { this.writeLock.release(); @@ -661,19 +674,26 @@ } /** Determines the minimal number of documents required before the buffered - * in-memory documents are merged and a new Segment is created. + * in-memory documents are flushed as a new Segment. * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory}, * large value gives faster indexing. At the same time, mergeFactor limits * the number of files open in a FSDirectory. * - *

The default value is 10. + *

If this is 0, then the RAM buffer is flushed instead + * by overally RAM usage (see {@link + * #setRAMBufferSizeMB}). If this is non-zero, then + * flushing is triggered by maxBufferedDocs and not by + * overall RAM usage.

* - * @throws IllegalArgumentException if maxBufferedDocs is smaller than 2 + *

The default value is 0.

+ * + * @throws IllegalArgumentException if maxBufferedDocs is + * non-zero and smaller than 2 */ public void setMaxBufferedDocs(int maxBufferedDocs) { ensureOpen(); - if (maxBufferedDocs < 2) - throw new IllegalArgumentException("maxBufferedDocs must at least be 2"); + if (maxBufferedDocs != 0 && maxBufferedDocs < 2) + throw new IllegalArgumentException("maxBufferedDocs must at least be 2 or 0 to disable"); this.minMergeDocs = maxBufferedDocs; } @@ -685,7 +705,28 @@ return minMergeDocs; } + /** Determines the amount of RAM that may be used for + * buffering before the in-memory documents are flushed as + * a new Segment. This only applies when maxBufferedDocs + * is set to 0. Generally for faster indexing performance + * it's best to flush by RAM usage instead of document + * count. + */ + public void setRAMBufferSizeMB(float mb) { + if (mb < 1) + throw new IllegalArgumentException("ramBufferSize must at least be 1 MB"); + ramBufferSize = mb*1024F*1024F; + docWriter.setRAMBufferSizeMB(mb); + } + /** + * @see #setRAMBufferSizeMB + */ + public float getRAMBufferSizeMB() { + return ramBufferSize/1024F/1024F; + } + + /** *

Determines the minimal number of delete terms required before the buffered * in-memory delete terms are applied and flushed. If there are documents * buffered in memory at the time, they are merged and a new segment is @@ -756,7 +797,9 @@ public void setInfoStream(PrintStream infoStream) { ensureOpen(); this.infoStream = infoStream; - deleter.setInfoStream(infoStream); + docWriter.setInfoStream(infoStream); + // nocommit + //deleter.setInfoStream(infoStream); } /** @@ -835,7 +878,7 @@ */ public synchronized void close() throws CorruptIndexException, IOException { if (!closed) { - flushRamSegments(); + flush(); if (commitPending) { segmentInfos.write(directory); // now commit changes @@ -844,7 +887,6 @@ rollbackSegmentInfos = null; } - ramDirectory.close(); if (writeLock != null) { writeLock.release(); // release write lock writeLock = null; @@ -884,7 +926,7 @@ /** Returns the number of documents currently in this index. */ public synchronized int docCount() { ensureOpen(); - int count = ramSegmentInfos.size(); + int count = docWriter.docID; for (int i = 0; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); count += si.docCount; @@ -962,24 +1004,13 @@ */ public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); - synchronized (this) { - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); - } + docWriter.addDocument(doc, analyzer); + // For the non-autoCommit case, MultiDocumentWriter + // takes care of flushing its pending state to disk + if (autoCommit) + maybeFlush(); } - SegmentInfo buildSingleDocSegment(Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - DocumentWriter dw = new DocumentWriter(ramDirectory, analyzer, this); - dw.setInfoStream(infoStream); - String segmentName = newRamSegmentName(); - dw.addDocument(segmentName, doc); - SegmentInfo si = new SegmentInfo(segmentName, 1, ramDirectory, false, false); - si.setNumFields(dw.getNumFields()); - return si; - } - /** * Deletes the document(s) containing term. * @param term the term to identify the documents to be deleted @@ -989,7 +1020,7 @@ public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); bufferDeleteTerm(term); - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1005,7 +1036,7 @@ for (int i = 0; i < terms.length; i++) { bufferDeleteTerm(terms[i]); } - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1041,26 +1072,23 @@ public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); - synchronized (this) { - bufferDeleteTerm(term); - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); - } + bufferDeleteTerm(term); + docWriter.addDocument(doc, analyzer); + // nocommit: what if we need to trigger on max delete terms? + // For the non-autoCommit case, MultiDocumentWriter + // takes care of flushing its pending state to disk + if (autoCommit) + maybeFlush(); } - final synchronized String newRamSegmentName() { - return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX); - } - // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); } // for test purpose - final synchronized int getRamSegmentCount(){ - return ramSegmentInfos.size(); + final synchronized int getNumBufferedDocuments(){ + return docWriter.docID; } // for test purpose @@ -1089,6 +1117,7 @@ */ private int mergeFactor = DEFAULT_MERGE_FACTOR; + // nocommit fix javadocs /** Determines the minimal number of documents required before the buffered * in-memory documents are merging and a new Segment is created. * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory}, @@ -1096,10 +1125,11 @@ * the number of files open in a FSDirectory. * *

The default value is {@link #DEFAULT_MAX_BUFFERED_DOCS}. - */ private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS; + // nocommit javadoc + private float ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; /** Determines the largest number of documents ever merged by addDocument(). * Small values (e.g., less than 10,000) are best for interactive indexing, @@ -1177,7 +1207,7 @@ */ public synchronized void optimize() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + flush(); while (segmentInfos.size() > 1 || (segmentInfos.size() == 1 && (SegmentReader.hasDeletions(segmentInfos.info(0)) || @@ -1186,7 +1216,7 @@ (useCompoundFile && (!SegmentReader.usesCompoundFile(segmentInfos.info(0))))))) { int minSegment = segmentInfos.size() - mergeFactor; - mergeSegments(segmentInfos, minSegment < 0 ? 0 : minSegment, segmentInfos.size()); + mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size()); } } @@ -1203,7 +1233,7 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; if (localAutoCommit) { - flushRamSegments(); + flush(); // Turn off auto-commit during our local transaction: autoCommit = false; } else @@ -1293,16 +1323,18 @@ segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); + docWriter.abort(); + // Ask deleter to locate unreferenced files & remove // them: deleter.checkpoint(segmentInfos, false); deleter.refresh(); - ramSegmentInfos = new SegmentInfos(); bufferedDeleteTerms.clear(); numBufferedDeleteTerms = 0; commitPending = false; + docWriter.abort(); close(); } else { @@ -1397,7 +1429,7 @@ for (int base = start; base < segmentInfos.size(); base++) { int end = Math.min(segmentInfos.size(), base+mergeFactor); if (end-base > 1) { - mergeSegments(segmentInfos, base, end); + mergeSegments(base, end); } } } @@ -1437,7 +1469,7 @@ // segments in S may not since they could come from multiple indexes. // Here is the merge algorithm for addIndexesNoOptimize(): // - // 1 Flush ram segments. + // 1 Flush ram. // 2 Consider a combined sequence with segments from T followed // by segments from S (same as current addIndexes(Directory[])). // 3 Assume the highest level for segments in S is h. Call @@ -1458,14 +1490,18 @@ // copy a segment, which may cause doc count to change because deleted // docs are garbage collected. - // 1 flush ram segments + // 1 flush ram ensureOpen(); - flushRamSegments(); + flush(); // 2 copy segment infos and find the highest level from dirs int startUpperBound = minMergeDocs; + // nocommit: what to do? + if (startUpperBound == 0) + startUpperBound = 10; + boolean success = false; startTransaction(); @@ -1524,7 +1560,7 @@ // copy those segments from S for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { - mergeSegments(segmentInfos, i, i + 1); + mergeSegments(i, i + 1); } if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { success = true; @@ -1533,7 +1569,7 @@ } // invariants do not hold, simply merge those segments - mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount); + mergeSegments(segmentCount - numTailSegments, segmentCount); // maybe merge segments again if necessary if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { @@ -1673,22 +1709,18 @@ throws IOException { } - protected final void maybeFlushRamSegments() throws CorruptIndexException, IOException { + protected final synchronized void maybeFlush() throws CorruptIndexException, IOException { // A flush is triggered if enough new documents are buffered or - // if enough delete terms are buffered - if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) { - flushRamSegments(); + // if enough delete terms are buffered or enough RAM is + // being consumed + // nocommit + if (numBufferedDeleteTerms >= maxBufferedDeleteTerms || + (autoCommit && ((minMergeDocs != 0 && docWriter.docID >= minMergeDocs) || + ((true || minMergeDocs == 0) && autoCommit && docWriter.getRAMUsed() > ramBufferSize)))) { + flush(); } } - /** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */ - private final synchronized void flushRamSegments() throws CorruptIndexException, IOException { - if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) { - mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size()); - maybeMergeSegments(minMergeDocs); - } - } - /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -1699,7 +1731,86 @@ */ public final synchronized void flush() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + + SegmentInfo newSegment = null; + boolean anything = false; + + boolean flushDocs = docWriter.docID > 0; + boolean flushDeletes = bufferedDeleteTerms.size() > 0; + final int numDocs = docWriter.docID; + + if (flushDocs || flushDeletes) { + + SegmentInfos rollback = null; + + if (flushDeletes) + rollback = (SegmentInfos) segmentInfos.clone(); + + boolean success = false; + + try { + if (flushDocs) { + int mergedDocCount = docWriter.docID; + String segment = docWriter.segment; + docWriter.flush(newSegmentName()); + newSegment = new SegmentInfo(segment, + mergedDocCount, + directory, false, true); + segmentInfos.addElement(newSegment); + } + + if (flushDeletes) { + maybeApplyDeletes(flushDocs); + doAfterFlush(); + } + + checkpoint(); + success = true; + } finally { + if (!success) { + if (flushDeletes) { + // Fully replace the segmentInfos since flushed + // deletes could have changed any of the + // SegmentInfo instances: + segmentInfos.clear(); + segmentInfos.addAll(rollback); + } else { + // Remove segment we added, if any: + if (newSegment != null && + segmentInfos.size() > 0 && + segmentInfos.info(segmentInfos.size()-1) == newSegment) + segmentInfos.remove(segmentInfos.size()-1); + docWriter.abort(); + } + deleter.checkpoint(segmentInfos, false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + + if (flushDocs && useCompoundFile) { + success = false; + try { + docWriter.createCompoundFile(newSegment.name); + newSegment.setUseCompoundFile(true); + checkpoint(); + success = true; + } finally { + if (!success) { + newSegment.setUseCompoundFile(false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + } + + // nocommit + // maybeMergeSegments(mergeFactor * numDocs / 2); + + maybeMergeSegments(minMergeDocs); + } } /** Expert: Return the total size of all index files currently cached in memory. @@ -1707,15 +1818,15 @@ */ public final long ramSizeInBytes() { ensureOpen(); - return ramDirectory.sizeInBytes(); + return docWriter.getRAMUsed(); } /** Expert: Return the number of documents whose segments are currently cached in memory. - * Useful when calling flushRamSegments() + * Useful when calling flush() */ public final synchronized int numRamDocs() { ensureOpen(); - return ramSegmentInfos.size(); + return docWriter.docID; } /** Incremental segment merger. */ @@ -1723,6 +1834,9 @@ long lowerBound = -1; long upperBound = startUpperBound; + // nocommit + if (upperBound == 0) upperBound = 10; + while (upperBound < maxMergeDocs) { int minSegment = segmentInfos.size(); int maxSegment = -1; @@ -1754,7 +1868,7 @@ while (numSegments >= mergeFactor) { // merge the leftmost* mergeFactor segments - int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor); + int docCount = mergeSegments(minSegment, minSegment + mergeFactor); numSegments -= mergeFactor; if (docCount > upperBound) { @@ -1783,39 +1897,29 @@ * Merges the named range of segments, replacing them in the stack with a * single segment. */ - private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end) + private final int mergeSegments(int minSegment, int end) throws CorruptIndexException, IOException { - // We may be called solely because there are deletes - // pending, in which case doMerge is false: - boolean doMerge = end > 0; final String mergedName = newSegmentName(); + SegmentMerger merger = null; - - final List ramSegmentsToDelete = new ArrayList(); - SegmentInfo newSegment = null; int mergedDocCount = 0; - boolean anyDeletes = (bufferedDeleteTerms.size() != 0); // This is try/finally to make sure merger's readers are closed: try { - if (doMerge) { - if (infoStream != null) infoStream.print("merging segments"); - merger = new SegmentMerger(this, mergedName); + if (infoStream != null) infoStream.print("merging segments"); - for (int i = minSegment; i < end; i++) { - SegmentInfo si = sourceSegments.info(i); - if (infoStream != null) - infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); - IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) - merger.add(reader); - if (reader.directory() == this.ramDirectory) { - ramSegmentsToDelete.add(si); - } - } + merger = new SegmentMerger(this, mergedName); + + for (int i = minSegment; i < end; i++) { + SegmentInfo si = segmentInfos.info(i); + if (infoStream != null) + infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); + IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) + merger.add(reader); } SegmentInfos rollback = null; @@ -1825,99 +1929,57 @@ // if we hit exception when doing the merge: try { - if (doMerge) { - mergedDocCount = merger.merge(); + mergedDocCount = merger.merge(); - if (infoStream != null) { - infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); - } + if (infoStream != null) { + infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); + } - newSegment = new SegmentInfo(mergedName, mergedDocCount, - directory, false, true); - } + newSegment = new SegmentInfo(mergedName, mergedDocCount, + directory, false, true); - if (sourceSegments != ramSegmentInfos || anyDeletes) { - // Now save the SegmentInfo instances that - // we are replacing: - rollback = (SegmentInfos) segmentInfos.clone(); - } + rollback = (SegmentInfos) segmentInfos.clone(); - if (doMerge) { - if (sourceSegments == ramSegmentInfos) { - segmentInfos.addElement(newSegment); - } else { - for (int i = end-1; i > minSegment; i--) // remove old infos & add new - sourceSegments.remove(i); + for (int i = end-1; i > minSegment; i--) // remove old infos & add new + segmentInfos.remove(i); - segmentInfos.set(minSegment, newSegment); - } - } + segmentInfos.set(minSegment, newSegment); - if (sourceSegments == ramSegmentInfos) { - maybeApplyDeletes(doMerge); - doAfterFlush(); - } - checkpoint(); success = true; } finally { - if (success) { - // The non-ram-segments case is already committed - // (above), so all the remains for ram segments case - // is to clear the ram segments: - if (sourceSegments == ramSegmentInfos) { - ramSegmentInfos.removeAllElements(); - } - } else { + if (!success && rollback != null) { + // Rollback the individual SegmentInfo + // instances, but keep original SegmentInfos + // instance (so we don't try to write again the + // same segments_N file -- write once): + segmentInfos.clear(); + segmentInfos.addAll(rollback); - // Must rollback so our state matches index: - if (sourceSegments == ramSegmentInfos && !anyDeletes) { - // Simple case: newSegment may or may not have - // been added to the end of our segment infos, - // so just check & remove if so: - if (newSegment != null && - segmentInfos.size() > 0 && - segmentInfos.info(segmentInfos.size()-1) == newSegment) { - segmentInfos.remove(segmentInfos.size()-1); - } - } else if (rollback != null) { - // Rollback the individual SegmentInfo - // instances, but keep original SegmentInfos - // instance (so we don't try to write again the - // same segments_N file -- write once): - segmentInfos.clear(); - segmentInfos.addAll(rollback); - } - // Delete any partially created and now unreferenced files: deleter.refresh(); } } } finally { // close readers before we attempt to delete now-obsolete segments - if (doMerge) merger.closeReaders(); + merger.closeReaders(); } - // Delete the RAM segments - deleter.deleteDirect(ramDirectory, ramSegmentsToDelete); - // Give deleter a chance to remove files now. deleter.checkpoint(segmentInfos, autoCommit); - if (useCompoundFile && doMerge) { + if (useCompoundFile) { boolean success = false; try { - merger.createCompoundFile(mergedName + ".cfs"); newSegment.setUseCompoundFile(true); checkpoint(); success = true; - } finally { if (!success) { // Must rollback: @@ -1936,14 +1998,14 @@ // Called during flush to apply any buffered deletes. If // doMerge is true then a new segment was just created and // flushed from the ram segments. - private final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException { + private final void maybeApplyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on " + segmentInfos.size() + " segments."); - if (doMerge) { + if (flushedNewSegment) { IndexReader reader = null; try { reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1)); @@ -1964,7 +2026,7 @@ } int infosEnd = segmentInfos.size(); - if (doMerge) { + if (flushedNewSegment) { infosEnd--; } @@ -1996,6 +2058,8 @@ private final boolean checkNonDecreasingLevels(int start) { int lowerBound = -1; int upperBound = minMergeDocs; + if (upperBound == 0) + upperBound = 10; for (int i = segmentInfos.size() - 1; i >= start; i--) { int docCount = segmentInfos.info(i).docCount; @@ -2044,10 +2108,11 @@ // well as the disk segments. private void bufferDeleteTerm(Term term) { Num num = (Num) bufferedDeleteTerms.get(term); + int numDoc = docWriter.docID; if (num == null) { - bufferedDeleteTerms.put(term, new Num(ramSegmentInfos.size())); + bufferedDeleteTerms.put(term, new Num(numDoc)); } else { - num.setNum(ramSegmentInfos.size()); + num.setNum(numDoc); } numBufferedDeleteTerms++; } @@ -2057,17 +2122,20 @@ // the documents buffered before it, not those buffered after it. private final void applyDeletesSelectively(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { + //System.out.println("now apply selective deletes"); Iterator iter = deleteTerms.entrySet().iterator(); while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); - + //System.out.println(" term " + term); + TermDocs docs = reader.termDocs(term); if (docs != null) { int num = ((Num) entry.getValue()).getNum(); try { while (docs.next()) { int doc = docs.doc(); + //System.out.println(" doc " + doc + " vs " + num); if (doc >= num) { break; } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 523296) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -97,6 +97,7 @@ private PrintStream infoStream; private Directory directory; private IndexDeletionPolicy policy; + private MultiDocumentWriter docWriter; void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; @@ -116,10 +117,12 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream) + public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream, MultiDocumentWriter docWriter) throws CorruptIndexException, IOException { + this.docWriter = docWriter; this.infoStream = infoStream; + this.policy = policy; this.directory = directory; @@ -310,6 +313,8 @@ // Incref the files: incRef(segmentInfos, isCommit); + if (docWriter != null) + incRef(docWriter.files()); if (isCommit) { // Append to our commits list: @@ -325,9 +330,8 @@ // DecRef old files from the last checkpoint, if any: int size = lastFiles.size(); if (size > 0) { - for(int i=0;i 0) + state.clearHash(); + */ + } + + if (fieldsWriter != null) { + fieldsWriter.close(); + fieldsWriter = null; + } + + flushedNorms = false; + flushedVectors = false; + + files = null; + docID = 0; + nextWriteDocID = 0; + } + + // flush all changes to a real segment + int netFlushCount = 0; + int netFlushTimes = 0; + int lastFlushDocID; + + synchronized void flush(String newSegmentName) throws IOException { + //System.out.println("FLUSH @ " + docID); + //System.out.println(" mem now: " + bean.getHeapMemoryUsage().getUsed()); + + // Must wait for all in-flight documents to finish: + if (numThreadState > freeThreadStates.size()) { + // System.out.println(" mark pending & wait..." + numThreadState + " vs " + freeThreadStates.size()); + flushPending = true; + while (numThreadState > freeThreadStates.size()) { + // System.out.println("flush wait: " + numThreadState + " vs " + freeThreadStates.size()); + try { + wait(); + } catch (InterruptedException e) { + } + } + // System.out.println(" wait done..."); + } + + // nocommit: what if we hit exception before notifyAll? + netFlushCount += docID; + netFlushTimes++; + // System.out.println(" FLUSH avg # docs=" + (((float) netFlushCount)/netFlushTimes)); + + fieldInfos.write(directory, segment + ".fnm"); + + flushTerms(); + + // write norms of indexed fields + writeNorms(); + + assert fieldInfos.hasVectors() == (tvx != null); + + if (tvx != null) { + flushedVectors = true; + tvx.close(); + tvf.close(); + tvd.close(); + tvx = null; + } else { + flushedVectors = false; + } + + if (fieldsWriter != null) { + fieldsWriter.close(); + fieldsWriter = null; + } + + final int size = freeThreadStates.size(); + for(int i=0;i 0) + state.clearHash(); + */ + } + + flushedNorms = hasNorms; + files = null; + + if (newSegmentName != null) + reset(newSegmentName); + flushPending = false; + notifyAll(); + } + + // start a new segment + void reset(String segment) throws IOException { + this.segment = segment; + docID = 0; + nextWriteDocID = 0; + hasNorms = false; + fieldInfos = new FieldInfos(); + flushedCount = 0; + Arrays.fill(levelCounts, 0); + Arrays.fill(levelSizes, 0); + Arrays.fill(flushedLevelCounts, 0); + Arrays.fill(flushedLevelSizes, 0); + files = null; + } + + private BufferedNorms[] norms = new BufferedNorms[0]; + + // Per-thread items + private class ThreadState { + int docID; + + // We write term vectors to this, privately, and then + // flush the data to the real term vectors file later + RAMWriter tvfLocal = new RAMWriter(); + + RAMWriter fdxLocal = new RAMWriter(); + RAMWriter fdtLocal = new RAMWriter(); + + // Used whenever this thread makes a new RAM segment + RAMWriter termsOut = new RAMWriter(); + RAMWriter proxOut = new RAMWriter(); + RAMWriter freqOut = new RAMWriter(); + + private long[] vectorFieldPointers = new long[0]; + private int[] vectorFieldNumbers = new int[0]; + + private FieldData[] docFieldData = new FieldData[10]; + private int numStoredFields; + + FieldData[] fieldDataArray; + int numFieldData; + FieldData[] fieldDataHash; + + // FieldsWriter that's temporarily private to this + // thread. We write fields here and then copy the + // output to the real FieldsWriter. + private FieldsWriter localFieldsWriter; + + public ThreadState() { + fieldDataArray = new FieldData[21]; + fieldDataHash = new FieldData[28]; + vectorFieldPointers = new long[21]; + vectorFieldNumbers = new int[21]; + initPostingArrays(); + } + + void initPostingArrays() { + postingsArrayLimit = 16; + postingsArray = new Posting[postingsArrayLimit]; + // NOTE: must be a power of two for hash collision + // strategy to work correctly + postingsHashSize = 32; + postingsHashMask = postingsHashSize-1; + postingsHash = new Posting[postingsHashSize]; + } + + private int gen; + long lastFreq; + long lastProx; + + int docCode; + int docCodeFreq1; + + final byte[] docCodeBytes = new byte[5]; + final byte[] docCodeFreq1Bytes = new byte[5]; + + int numBytesDocCode; + int numBytesDocCodeFreq1; + String lastTermText; + + private boolean docHasReader; + private int netIndexedFieldLength; + + float docBoost; + + // Initializes shared state for this new document. + void init(Document doc, int docID) throws IOException { + + this.docID = docID; + docBoost = doc.getBoost(); + + List docFields = doc.getFields(); + numStoredFields = 0; + final int numDocFields = docFields.size(); + + // Maybe grow our docFieldData + if (docFieldData.length < numDocFields) { + int newSize = (int) (numDocFields*1.25); + docFieldData = new FieldData[newSize]; + } + + boolean docHasVectors = false; + + docHasReader = false; + netIndexedFieldLength = 0; + + gen++; + + numFieldData = 0; + + // Absorb any new fields first seen in this document. + // Also absorb any changes to fields we had already + // seen before (eg suddenly turning on norms or + // vectors, etc.): + for(int i=0;i= hi) + return; + + int mid = (lo + hi) / 2; + + if (postings[lo].text.compareTo(postings[mid].text) > 0) { + Posting tmp = postings[lo]; + postings[lo] = postings[mid]; + postings[mid] = tmp; + } + + if (postings[mid].text.compareTo(postings[hi].text) > 0) { + Posting tmp = postings[mid]; + postings[mid] = postings[hi]; + postings[hi] = tmp; + + if (postings[lo].text.compareTo(postings[mid].text) > 0) { + Posting tmp2 = postings[lo]; + postings[lo] = postings[mid]; + postings[mid] = tmp2; + } + } + + int left = lo + 1; + int right = hi - 1; + + if (left >= right) + return; + + String partition = postings[mid].text; + + for (; ;) { + while (postings[right].text.compareTo(partition) > 0) + --right; + + while (left < right && postings[left].text.compareTo(partition) <= 0) + ++left; + + if (left < right) { + Posting tmp = postings[left]; + postings[left] = postings[right]; + postings[right] = tmp; + --right; + } else { + break; + } + } + + quickSort(postings, lo, left); + quickSort(postings, left + 1, hi); + } + + private Posting[] postingsArray; + private int postingsArrayLimit; + private int numPostings; + + private Posting[] postingsHash; + private int postingsHashSize; + private int postingsHashMask; + + int length; + int position; + int offset; + float boost; + FieldInfo fieldInfo; + + // Tokenizes the fields of a document into Postings. + final void processDocument(Analyzer analyzer) + throws IOException { + + final int maxFieldLength = writer.getMaxFieldLength(); + final int numFields = numFieldData; + + docCode = docID << 1; + docCodeFreq1 = docCode+1; + lastTermText = ""; + + numBytesDocCode = writeVInt(docCodeBytes, docCode); + numBytesDocCodeFreq1 = writeVInt(docCodeFreq1Bytes, docCodeFreq1); + lastFreq = 0; + lastProx = 0; + numVectorFields = 0; + + assert termsOut.buffer == null; + assert freqOut.buffer == null; + assert proxOut.buffer == null; + + // Make coarse guess on how large initial buffers + // should be. This helps most in the multithreaded + // case since allocating a new RAM buffer is + // synchronized. If we guess to large it's not really + // a problem because the level 1 merge will compress + // it. + final int startLevel; + if (docHasReader) netIndexedFieldLength += 4096; + if (netIndexedFieldLength < 2048) + startLevel = 0; + else if (netIndexedFieldLength < 4096) + startLevel = 1; + else if (netIndexedFieldLength < 8192) + startLevel = 2; + else if (netIndexedFieldLength < 16384) + startLevel = 3; + else + startLevel = 4; + + termsOut.setStartLevel(startLevel); + freqOut.setStartLevel(startLevel); + proxOut.setStartLevel(startLevel); + + // Sort by field name + // TODO: only if it changed since last doc: how? + Arrays.sort(fieldDataArray, 0, numFields); + + fdtLocal.writeVInt(numStoredFields); + + for(int i=0;i0) position += analyzer.getPositionIncrementGap(fieldInfo.name); + + final boolean storeOffsets = field.isStoreOffsetWithTermVector(); + + if (!field.isTokenized()) { // un-tokenized field + payload = null; + String stringValue = field.stringValue(); + if (storeOffsets) + addPosition(stringValue, position++, offset, offset + stringValue.length()); + else + addPosition(stringValue, position++, -1, -1); + offset += stringValue.length(); + length++; + } else { + Reader reader; // find or make Reader + if (field.readerValue() != null) + reader = field.readerValue(); + else + reader = new StringReader(field.stringValue()); + + // Tokenize field and add to postingTable + TokenStream stream = analyzer.tokenStream(fieldInfo.name, reader); + try { + Token lastToken = null; + for (Token t = stream.next(); t != null; t = stream.next()) { + position += (t.getPositionIncrement() - 1); + payload = t.getPayload(); + + // TODO: factor this if out of this loop? + if (storeOffsets) + addPosition(t.termText(), position++, offset + t.startOffset(), offset + t.endOffset()); + else + addPosition(t.termText(), position++, -1, -1); + + lastToken = t; + if (++length >= maxFieldLength) { + if (infoStream != null) + infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); + break; + } + } + + if(lastToken != null) + offset += lastToken.endOffset() + 1; + + } finally { + stream.close(); + } + } + + boost *= field.getBoost(); + } + + final class Posting { // info about a Term in a doc + String text; + int freq; // its frequency in doc + int[] positionsAndOffsets; // positions + optionally offsets [inlined] + Payload[] payloads; + } + + private final void addPosition(String text, int position, int offsetStart, int offsetEnd) { + + // System.out.println("offs: " + offsetStart + " " + + // offsetEnd); + + final int code = text.hashCode(); + int hashPos = code & postingsHashMask; + assert hashPos >= 0; + + // System.out.println(" text=" + text + " pos=" + hashPos); + + Posting p = postingsHash[hashPos]; + if (p != null && !p.text.equals(text)) { + int code2 = code|1; + do { + hashPos = code2 & postingsHashMask; + code2 += code|1; + // System.out.println(" carry pos=" + hashPos + " code2=" + code2 + " code=" + code); + p = postingsHash[hashPos]; + } while (p != null && !p.text.equals(text)); + } + + if (p != null) { // word seen before + final int freq = p.freq; + + // Logic below for handling a recycled (from last + // doc) Posting entry. This does not seem to help + // performance, though. + + /* + if (0 == freq) { // recycled from last doc + p.positions = newIntArray(); + p.positions[0] = position; + p.freq = 1; + if (offsetStart != -1) { + p.offsetsStart = newIntArray(); + p.offsetsEnd = newIntArray(); + p.offsetsStart[0] = offsetStart; + p.offsetsEnd[0] = offsetEnd; + } else + p.offsetsStart = null; + + if (payload != null) { + p.payloads = new Payload[1]; + p.payloads[0] = payload; + fieldInfo.storePayloads = true; + } else + p.payloads = null; + + } else { + */ + + final int spot; + + if (offsetStart != -1) + spot = 3*freq; + else + spot = freq; + + if (p.positionsAndOffsets.length == spot) { + p.positionsAndOffsets = swapUp(p.positionsAndOffsets); + + if (p.payloads != null) { + // the current field stores payloads + Payload[] newPayloads = new Payload[freq * 2]; // grow payloads array + System.arraycopy(p.payloads, 0, newPayloads, 0, p.payloads.length); + p.payloads = newPayloads; + } + } + p.positionsAndOffsets[spot] = position; // add new position + + if (offsetStart != -1) { + p.positionsAndOffsets[1+spot] = offsetStart; + p.positionsAndOffsets[2+spot] = offsetEnd; + } + + if (payload != null) { + if (p.payloads == null) { + // lazily allocate payload array + final int size; + if (offsetStart != -1) { + assert 0 == (p.positionsAndOffsets.length%3); + size = p.positionsAndOffsets.length/3; + } else + size = p.positionsAndOffsets.length; + p.payloads = new Payload[size]; + } + p.payloads[freq] = payload; + fieldInfo.storePayloads = true; + } + + p.freq = freq + 1; // update frequency + + // } + + } else { // word not seen before + + p = postingsArray[numPostings]; + if (p == null) + p = postingsArray[numPostings] = new Posting(); + numPostings++; + p.text = text; + postingsHash[hashPos] = p; + + if (numPostings == postingsArrayLimit) { + // Resize postings array + int newSize = postingsArrayLimit*2; + Posting[] newPostings = new Posting[newSize]; + System.arraycopy(postingsArray, 0, newPostings, 0, postingsArrayLimit); + postingsArray = newPostings; + postingsArrayLimit = newSize; + + // Resize hash + newSize = postingsHashSize*2; + // System.out.println(" REHASH to " + newSize); + postingsHashMask = newSize-1; + Posting[] newHash = new Posting[newSize]; + for(int i=0;i= 0; + if (newHash[hashPosx] != null) { + int codex2 = codex|1; + do { + hashPosx = codex2 & postingsHashMask; + codex2 += codex|1; + } while (newHash[hashPosx] != null); + } + newHash[hashPosx] = p0; + } + } + + postingsHash = newHash; + postingsHashSize = newSize; + } + + p.freq = 1; + if (offsetStart != -1) { + p.positionsAndOffsets = newIntArray(3); + p.positionsAndOffsets[1] = offsetStart; + p.positionsAndOffsets[2] = offsetEnd; + } else + p.positionsAndOffsets = newIntArray(1); + + p.positionsAndOffsets[0] = position; + + if (payload != null) { + p.payloads = new Payload[1]; + p.payloads[0] = payload; + fieldInfo.storePayloads = true; + } + } + } + + private RAMSegment ramSegment; + private int numVectorFields; + + public int writeVInt(byte[] b, int i) { + int upto = 0; + while ((i & ~0x7F) != 0) { + b[upto++] = ((byte)((i & 0x7f) | 0x80)); + i >>>= 7; + } + b[upto++] = ((byte)i); + return upto; + } + + private void addPostingsAndVectors(FieldData fp) + throws CorruptIndexException, IOException { + + final FieldInfo currentField = fp.fieldInfo; + + final Posting[] postings = postingsArray; + final int numTerms = numPostings; + final int fieldNumber = fp.fieldInfo.number; + final int postingsHashSize = postingsHash.length; + + final boolean doVectors = fp.doVectors; + final boolean doPositions = fp.doVectorPositions; + final boolean doOffsets = fp.doVectorOffsets; + + if (doVectors) { + vectorFieldNumbers[numVectorFields] = fieldNumber; + vectorFieldPointers[numVectorFields++] = tvfLocal.getFilePointer(); + tvfLocal.writeVInt(numTerms); + byte bits = 0x0; + if (doPositions) + bits |= TermVectorsWriter.STORE_POSITIONS_WITH_TERMVECTOR; + if (doOffsets) + bits |= TermVectorsWriter.STORE_OFFSET_WITH_TERMVECTOR; + tvfLocal.writeByte(bits); + } + + // TODO: should we first partition array to remove + // terms that have freq 0 (ie were recycled and then + // did not re-appear in this doc)? + + // int newNumPostings = 0; + // System.out.println(" add postings"); + + for(int i=0;i 256 || freeList[newSize] == null)) { + // alloc a new array + newArray = new int[newSize]; + cell = null; + // if (newSize <= 256) + // System.out.println("I: " + newSize); + } else { + // reuse existing array + cell = freeList[newSize]; + freeList[newSize] = cell.next; + newArray = cell.array; + } + + // Optimize copy for small arrays + switch(oldSize) { + case 8: + newArray[7] = array[7]; + newArray[6] = array[6]; + newArray[5] = array[5]; + newArray[4] = array[4]; + case 4: + newArray[3] = array[3]; + newArray[2] = array[2]; + case 2: + newArray[1] = array[1]; + case 1: + newArray[0] = array[0]; + break; + default: + System.arraycopy(array, 0, newArray, 0, oldSize); + } + + if (oldSize <= 256) { + // save for reuse later + if (cell == null) { + if (freeCells != null) { + cell = freeCells; + freeCells = cell.next; + } else { + cell = new AllocCell(); + } + } + cell.array = array; + cell.next = freeList[oldSize]; + freeList[oldSize] = cell; + } + + return newArray; + } + + public int[] newIntArray(int size) { + final int[] r; + if (size <= 256 && REUSE_INT_ARRAYS) { + if (freeList[size] != null) { + AllocCell cell = freeList[size]; + r = cell.array; + freeList[size] = cell.next; + cell.next = freeCells; + freeCells = cell; + } else + r = new int[size]; + } else + r = new int[size]; + return r; + } + + // Free this array, recycling if possible + public void recycle(int[] array) { + assert REUSE_INT_ARRAYS; + if (array.length <= 256) { + AllocCell cell; + if (freeCells != null) { + cell = freeCells; + freeCells = cell.next; + } else { + cell = new AllocCell(); + } + cell.array = array; + cell.next = freeList[array.length]; + freeList[array.length] = cell; + } + } + } + + // nocommit: recycle + RAMReader getReader(RAMCell head, int finalLimit) { + return new RAMReader(head, finalLimit); + } + + synchronized ThreadState getThreadState(Document doc) throws IOException { + ThreadState state = null; + while(true) { + final int size = freeThreadStates.size(); + if (flushPending) { + try { + wait(); + } catch (InterruptedException e) { + } + } else if (0 == size) { + + // There are no free thread states, or, a flush is + // trying to happen + if (numWaiting >= MAX_WAIT_QUEUE) { + // System.out.println("do wait"); + + // There are too many thread states in line write + // to the index so we now pause to give them a + // chance to get scheduled by the JVM and finish + // their documents. Once we wake up again, a + // recycled ThreadState should be available else + // we wait again. + // System.out.println("w " + Thread.currentThread().getName()); + try { + wait(); + } catch (InterruptedException e) { + } + // System.out.println(" wd " + Thread.currentThread().getName()); + + } else { + // OK, just create a new thread state + state = new ThreadState(); + numThreadState++; + break; + } + } else { + // Use recycled thread state + state = (ThreadState) freeThreadStates.get(size-1); + freeThreadStates.remove(size-1); + break; + } + } + + boolean success = false; + try { + state.init(doc, docID++); + success = true; + } finally { + if (!success) + freeThreadStates.add(state); + } + + return state; + } + + void addDocument(Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + + // First pass: go through all fields in doc, updating + // shared FieldInfos and writing any stored fields: + final ThreadState state = getThreadState(doc); + boolean success = false; + try { + state.processDocument(analyzer); + success = true; + } finally { + if (success) + finishDocument(state); + else { + // nocommit: need to do some cleanup of the thread state? + freeThreadStates.add(state); + } + } + } + + private synchronized void finishDocument(ThreadState state) throws IOException { + + // Now write the indexed document to the real files. + if (nextWriteDocID == state.docID) { + // It's my turn, so write everything now: + try { + writeDocument(state); + } finally { + nextWriteDocID++; + // Recycle our thread state back in the free pool + freeThreadStates.add(state); + } + + // If any states were waiting on me, sweep through and + // flush those that are enabled by my write. + boolean doNotify = numWaiting >= MAX_WAIT_QUEUE || flushPending; + if (numWaiting > 0) { + while(true) { + int upto = 0; + for(int i=0;i 0) { + tvd.writeVInt(state.numVectorFields); + for(int i=0;i ramBufferSize/20) { + mergeRAMSegments(state, 0); + if (levelSizes[1] > ramBufferSize/10 && level0Compression < 0.7) + mergeRAMSegments(state, 1); + } + + if (doSelfFlush && totalSize > ramBufferSize) + flushRAMSegments(state); + } + + long getRAMUsed() { + return totalSize; + } + + public void printAlloc(String prefix, RAMCell head, int limit) { + RAMCell c = head; + System.out.print(prefix + ":"); + if (c == null) + System.out.println(" null"); + else + while(c != null) { + if (c.next == null) { + System.out.println(" " + c.buffer.length + "(" + limit + ")"); + break; + } else { + System.out.print(" " + c.buffer.length); + c = c.next; + } + } + } + + private final TermInfo termInfo = new TermInfo(); // minimize consing + private IndexOutput freqOutput; + private IndexOutput proxOutput; + private int skipInterval; + private int lastDoc; + private int lastPayloadLength; + private int df; + private boolean currentFieldStorePayloads; + private boolean didPack; + + // Write out the postings & dictionary to real output + // files, in the "real" lucene file format. This is to + // finalize a segment. + void flushTerms() throws IOException { + + if (infoStream != null) + infoStream.println("flush postings as segment " + segment + " docID=" + MultiDocumentWriter.this.docID); + + TermInfosWriter termInfosWriter = null; + + final int numRAMSegments = ramSegments.size(); + final int numFlushedSegments = flushedSegments.size(); + final int numSegmentsIn = numRAMSegments + numFlushedSegments; + resizeMergeInputs(numSegmentsIn); + int numDoc = 0; + long oldSize = 0; + long newSize = 0; + + try { + freqOutput = directory.createOutput(segment + ".frq"); + proxOutput = directory.createOutput(segment + ".prx"); + termInfosWriter = new TermInfosWriter(directory, segment, fieldInfos, + writer.getTermIndexInterval()); + skipInterval = termInfosWriter.skipInterval; + + RAMSegmentMergeQueue queue = null; + + queue = new RAMSegmentMergeQueue(numSegmentsIn); + int i=0; + for (;i 0) { + + SegmentMergeInfo smi = (SegmentMergeInfo) queue.pop(); + + long freqPointer = freqOutput.getFilePointer(); + long proxPointer = proxOutput.getFilePointer(); + + if (currentField != smi.fieldNumber) { + currentField = smi.fieldNumber; + currentFieldName = fieldInfos.fieldName(smi.fieldNumber); + currentFieldStorePayloads = fieldInfos.fieldInfo(smi.fieldNumber).storePayloads; + } + + // TODO: can we avoid cons'ing here? + Term term = new Term(currentFieldName, new String(smi.textBuffer, 0, smi.textLength)); + + df = 0; + lastDoc = 0; + lastPayloadLength = -1; + //System.out.println(" " + term + ": smi start idx " + smi.idx + " df=" + smi.df); + resetSkip(); + + appendPostings(smi); + + final int fieldNumber = smi.fieldNumber; + + if (smi.next()) + queue.put(smi); + + SegmentMergeInfo top = (SegmentMergeInfo) queue.top(); + + while (top != null && top.equalTerm(fieldNumber, term)) { + smi = (SegmentMergeInfo) queue.pop(); + //System.out.println(" idx " + smi.idx + ": df=" + smi.df + " totDF=" + df); + appendPostings(smi); + if (smi.next()) + queue.put(smi); + top = (SegmentMergeInfo) queue.top(); + } + + long skipPointer = writeSkip(); + + termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer)); + termInfosWriter.add(term, termInfo); + } + } finally { + newSize = freqOutput.length() + proxOutput.length() + directory.fileLength(segment + ".tis") + directory.fileLength(segment + ".tii"); + close(freqOutput, proxOutput, termInfosWriter); + for (int i=0;i>> 1; + + // System.out.println("doc=" + doc + " lastDoc=" + lastDoc + " df=" + df); + + //if (!(doc > lastDoc || df == 1)) + //System.out.println("doc=" + doc + " lastDoc=" + lastDoc + " df=" + df); + + assert doc > lastDoc || df == 1; + + final int termDocFreq; + final int newDocCode = (doc-lastDoc)<<1; + lastDoc = doc; + + if ((docCode & 1) != 0) { + freqOutput.writeVInt(newDocCode|1); + termDocFreq = 1; + //System.out.println(" doc " + doc + " freq 1"); + //System.out.println(" write " + (newDocCode|1)); + } else { + freqOutput.writeVInt(newDocCode); + termDocFreq = freq.readVInt(); + //System.out.println(" doc " + doc + " freq " + termDocFreq); + //System.out.println(" write " + newDocCode + " then " + termDocFreq); + + freqOutput.writeVInt(termDocFreq); + } + + /** See {@link DocumentWriter#writePostings(Posting[], String) for + * documentation about the encoding of positions and payloads + */ + for(int j=0;j 0) + copyBytes(prox, proxOutput, payloadLength); + } else { + assert 0 == (deltaCode&1); + proxOutput.writeVInt(deltaCode>>1); + } + } + } + } + + private RAMWriter skipBuffer = new RAMWriter(); + private int lastSkipDoc; + private int lastSkipPayloadLength; + private long lastSkipFreqPointer; + private long lastSkipProxPointer; + + private void resetSkip() { + lastSkipDoc = 0; + lastSkipPayloadLength = -1; // we don't have to write the first length in the skip list + lastSkipFreqPointer = freqOutput.getFilePointer(); + lastSkipProxPointer = proxOutput.getFilePointer(); + } + + private void bufferSkip(int doc, int payloadLength) throws IOException { + //System.out.println(" buffer skip: freq ptr " + freqPointer + " prox " + proxPointer); + //System.out.println(" vs last freq ptr " + lastSkipFreqPointer + " prox " + lastSkipProxPointer); + + // To efficiently store payloads in the posting lists we do not store the length of + // every payload. Instead we omit the length for a payload if the previous payload had + // the same length. + // However, in order to support skipping the payload length at every skip point must be known. + // So we use the same length encoding that we use for the posting lists for the skip data as well: + // Case 1: current field does not store payloads + // SkipDatum --> DocSkip, FreqSkip, ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // DocSkip records the document number before every SkipInterval th document in TermFreqs. + // Document numbers are represented as differences from the previous value in the sequence. + // Case 2: current field stores payloads + // SkipDatum --> DocSkip, PayloadLength?, FreqSkip,ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // PayloadLength --> VInt + // In this case DocSkip/2 is the difference between + // the current and the previous value. If DocSkip + // is odd, then a PayloadLength encoded as VInt follows, + // if DocSkip is even, then it is assumed that the + // current payload length equals the length at the previous + // skip point + + final int delta = doc - lastSkipDoc; + if (currentFieldStorePayloads) { + if (payloadLength == lastSkipPayloadLength) + // the current payload length equals the length at the previous skip point, + // so we don't store the length again + skipBuffer.writeVInt(delta << 1); + else { + // the payload length is different from the previous one. We shift the DocSkip, + // set the lowest bit and store the current payload length as VInt. + skipBuffer.writeVInt((delta << 1) + 1); + skipBuffer.writeVInt(payloadLength); + lastSkipPayloadLength = payloadLength; + } + } else + // current field does not store payloads + skipBuffer.writeVInt(delta); + + long freqPointer = freqOutput.getFilePointer(); + long proxPointer = proxOutput.getFilePointer(); + skipBuffer.writeVInt((int) (freqPointer - lastSkipFreqPointer)); + skipBuffer.writeVInt((int) (proxPointer - lastSkipProxPointer)); + lastSkipFreqPointer = freqPointer; + lastSkipProxPointer = proxPointer; + + lastSkipDoc = doc; + } + + private long writeSkip() throws IOException { + long skipPointer = freqOutput.getFilePointer(); + skipBuffer.writeTo(freqOutput); + return skipPointer; + } + + // Called when RAM buffer is full; we now merge all RAM + // segments to a single flushed segment: + final synchronized void flushRAMSegments(ThreadState state) throws IOException { + + if (infoStream != null) { + String name = tempFileName(".tis", flushedCount); + infoStream.println("\n" + getElapsedTime() + ": flush ram segments at docID " + docID + ", to " + name.substring(0, name.length()-4) + ": totalRam=" + (totalSize/1024/1024) + " MB"); + } + didPack = false; + System.out.println("FLUSH TEMP @ " + (docID-lastFlushDocID)); + lastFlushDocID = docID; + + IndexOutput termsOut = directory.createOutput(tempFileName(".tis", flushedCount)); + IndexOutput freqOut = directory.createOutput(tempFileName(".frq", flushedCount)); + IndexOutput proxOut = directory.createOutput(tempFileName(".prx", flushedCount)); + + final int numSegmentsIn = ramSegments.size(); + long newSize; + long oldSize = totalSize; + + resizeMergeInputs(numSegmentsIn); + + int numDoc = 0; + for(int i=0;i=level;i--) { + start = end; + end += flushedLevelCounts[i]; + } + + if (infoStream != null) { + String name = tempFileName(".tis", flushedCount); + infoStream.println("merge flushed segments to " + name.substring(0, name.length()-4) + ": level " + level + ": start " + start + " to end " + end); + } + + // maybe reallocate + resizeMergeInputs(end-start); + + long newSize; + int numDoc = 0; + + IndexOutput termsOut = directory.createOutput(tempFileName(".tis", flushedCount)); + IndexOutput freqOut = directory.createOutput(tempFileName(".frq", flushedCount)); + IndexOutput proxOut = directory.createOutput(tempFileName(".prx", flushedCount)); + + long oldSize = 0; + + try { + int upto = 0; + for (int i=start;i start; i--) // remove old infos & add new + flushedSegments.remove(i); + + FlushedSegment newFlushedSegment = new FlushedSegment(numDoc, flushedCount++, newSize); + flushedSegments.set(start, newFlushedSegment); + + if (flushedLevelSizes.length == level+1) { + flushedLevelSizes = realloc(flushedLevelSizes, 1+flushedLevelSizes.length); + flushedLevelCounts = realloc(flushedLevelCounts, 1+flushedLevelCounts.length); + } + + flushedLevelSizes[level] -= oldSize; + flushedLevelSizes[1+level] += newSize; + + flushedLevelCounts[level] -= (end-start); + flushedLevelCounts[1+level]++; + + totalFlushedSize += newSize - oldSize; + + if (infoStream != null) + infoStream.println("merge flushed segments done: oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalFlushed=" + (totalFlushedSize/1024/1024) + " MB"); + + files = null; + + // nocommit: should I just give IFD list of files to delete? + // Have deleter remove our now unreferenced files: + synchronized(writer) { + writer.getDeleter().checkpoint(writer.segmentInfos, false); + } + } + + static void close(IndexOutput f0, IndexOutput f1, IndexOutput f2) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (f2 != null) f2.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + static void close(IndexInput f0, IndexInput f1, IndexInput f2) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (f2 != null) f2.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + static void close(IndexOutput freq, IndexOutput prox, TermInfosWriter terms) throws IOException { + IOException keep = null; + try { + if (freq != null) freq.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (prox != null) prox.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (terms != null) terms.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + float level0Compression; + + NumberFormat nf = NumberFormat.getInstance(); + String getElapsedTime() { + long t = System.currentTimeMillis(); + nf.setMaximumFractionDigits(1); + nf.setMinimumFractionDigits(1); + return nf.format((t-startTime)/1000.0) + " sec"; + } + + // In-memory merge: reads multiple ram segments (in the + // modified format) and replaces with a single ram segment. + final void mergeRAMSegments(ThreadState state, int level) throws IOException { + + int start = 0; + int end = 0; + for(int i=levelCounts.length-1;i>=level;i--) { + start = end; + end += levelCounts[i]; + } + + if (infoStream != null) { + infoStream.println("\n" + getElapsedTime() + ": merge ram segments: level " + level + ": start idx " + start + " to end idx " + end + " docID=" + docID); + System.out.println(" RAM: " + totalSize); + } + long oldSize; + //long oldTermsSize; + //long oldFreqSize; + //long oldProxSize; + long newSize; + + int numDoc; + RAMSegment newRAMSegment; + + if (end == start+1) { + // Degenerate case, if suddenly an immense document + // comes through + newRAMSegment = (RAMSegment) ramSegments.get(start); + //oldTermsSize = newRAMSegment.terms.size; + //oldFreqSize = newRAMSegment.freq.size; + //oldProxSize = newRAMSegment.prox.size; + newSize = oldSize = newRAMSegment.size; + numDoc = newRAMSegment.numDoc; + } else { + + resizeMergeInputs(end-start); + final int numSegmentsIn = end-start; + + oldSize = 0; + //oldTermsSize = 0; + //oldFreqSize = 0; + //oldProxSize = 0; + int upto = 0; + numDoc = 0; + for(int i=start;i start; i--) { // remove old infos & add new + RAMSegment rs = (RAMSegment) ramSegments.get(i); + ramSegments.remove(i); + } + + RAMSegment rs = (RAMSegment) ramSegments.get(start); + + newRAMSegment = new RAMSegment(numDoc, state.termsOut, state.freqOut, state.proxOut); + newSize = newRAMSegment.size; + ramSegments.set(start, newRAMSegment); + } + + if (levelSizes.length == level+1) { + levelSizes = realloc(levelSizes, 1+levelSizes.length); + levelCounts = realloc(levelCounts, 1+levelCounts.length); + } + + levelSizes[level] -= oldSize; + levelSizes[1+level] += newSize; + + levelCounts[level] -= (end-start); + levelCounts[1+level]++; + + totalSize += newSize - oldSize; + if (0 == level) + level0Compression = ((float) newSize)/oldSize; + + if (infoStream != null) { + infoStream.println(" oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalRAM=" + (totalSize/1024/1024) + " MB"); + //infoStream.println(" termsSize=" + termsOut.size + " freqSize=" + freqOut.size + " proxSize=" + proxOut.size); + //infoStream.println(" oldTermsSize=" + oldTermsSize + " oldFreqSize=" + oldFreqSize + " oldProxSize=" + oldProxSize); + } + } + + SegmentMergeInfo mergeInputs[] = new SegmentMergeInfo[0]; + + final void resizeMergeInputs(final int minSize) { + if (mergeInputs.length < minSize) { + int size = (int) (minSize*1.25); + SegmentMergeInfo[] newMergeInputs = new SegmentMergeInfo[size]; + System.arraycopy(mergeInputs, 0, newMergeInputs, 0, mergeInputs.length); + for(int i=mergeInputs.length;i 0) { + + SegmentMergeInfo smi = (SegmentMergeInfo) queue.pop(); + int df = smi.df; + + if (debug) { + System.out.println(" term " + fieldInfos.fieldName(smi.fieldNumber) + ":" + new String(smi.textBuffer, 0, smi.textLength)); + System.out.println(" idx " + smi.idx + " freqSize=" + smi.freqSize + " proxSize=" + smi.proxSize + " df=" + smi.df); + } + + // bulk copy + smi.flush(); + + // write term (prefix compressed), docFreq, freq ptr, prox ptr + int len = lastCharsLength < smi.textLength ? lastCharsLength : smi.textLength; + int prefix = len; + for(int i=0;i 0; + + if (srcIn instanceof RAMReader) { + RAMReader src = (RAMReader) srcIn; + while(true) { + final int chunk = src.limit - src.upto; + if (chunk < numBytes) { + // Src is the limit + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + numBytes -= chunk; + } else if (chunk == numBytes) { + // Matched + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + break; + } else { + // numBytes is the limit + destIn.writeBytes(src.buffer, src.upto, (int) numBytes); + src.upto += numBytes; + break; + } + } + } else { + // Use intermediate buffer + while(numBytes > 0) { + final int chunk; + if (numBytes > 1024) { + chunk = 1024; + } else { + chunk = (int) numBytes; + } + srcIn.readBytes(byteBuffer, 0, chunk); + destIn.writeBytes(byteBuffer, chunk); + numBytes -= chunk; + } + } + } + + /** If non-null, a message will be printed to this if maxFieldLength is reached. + */ + void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + // nocommit + // this.infoStream = System.out; + } + + private class RAMSegment { + int numDoc; + RAMCell terms; + RAMCell freq; + RAMCell prox; + int termsLimit; + int freqLimit; + int proxLimit; + long size; + public RAMSegment(int numDoc, RAMWriter terms, RAMWriter freq, RAMWriter prox) { + this.numDoc = numDoc; + size = terms.size + freq.size + prox.size; + + this.terms = terms.head; + this.termsLimit = terms.upto; + terms.reset(); + + this.freq = freq.head; + this.freqLimit = freq.upto; + freq.reset(); + + this.prox = prox.head; + this.proxLimit = prox.upto; + prox.reset(); + } + } + + private class FlushedSegment { + int numDoc; + int segment; + long size; + public FlushedSegment(int numDoc, int segment, long size) { + this.numDoc = numDoc; + this.segment = segment; + this.size = size; + } + } + + final class SegmentMergeInfo { + int idx; + + char textBuffer[] = new char[10]; + int textLength; + int fieldNumber; + + IndexInput terms; + IndexInput freq; + IndexInput prox; + + IndexOutput freqOut; + IndexOutput proxOut; + + private long freqSize; + private long proxSize; + + long size; + long pos; + int df; + + SegmentMergeInfo(int idx) { + this.idx = idx; + } + + public void setInputs(IndexInput terms, IndexInput freq, IndexInput prox) { + this.terms = terms; + this.freq = freq; + this.prox = prox; + } + public void setOutputs(IndexOutput freqOut, IndexOutput proxOut) { + this.freqOut = freqOut; + this.proxOut = proxOut; + } + + public boolean next() throws IOException { + final int start = terms.readVInt(); + if (start == Integer.MAX_VALUE) + return false; + + assert start <= textLength; + final int length = terms.readVInt(); + textLength = start + length; + if (textLength > textBuffer.length) { + char[] newTextBuffer = new char[(int) (textLength*1.5)]; + //System.out.println("start=" + start + " length=" + length + " = textLength " + textLength + " vs " + newTextBuffer.length); + // System.out.println("here: " + terms); + System.arraycopy(textBuffer, 0, newTextBuffer, 0, start); + textBuffer = newTextBuffer; + } + terms.readChars(textBuffer, start, length); + fieldNumber = terms.readVInt(); + df = terms.readVInt(); + freqSize = terms.readVLong(); + proxSize = terms.readVLong(); + return true; + } + + public void flush() throws IOException { + copyBytes(freq, freqOut, freqSize); + copyBytes(prox, proxOut, proxSize); + } + + public void close() throws IOException { + MultiDocumentWriter.close(terms, freq, prox); + } + + protected final boolean equalTerm(int otherFieldNumber, char[] otherTextBuffer, int otherTextLength) { + if (otherFieldNumber == fieldNumber) { + final char[] textA = textBuffer; + final char[] textB = otherTextBuffer; + if (textLength != otherTextLength) + return false; + for(int i=0;i charB) + return false; + } + + if (stiA.textLength < stiB.textLength) + return true; + else if (stiA.textLength > stiB.textLength) + return false; + + // finally by index + return stiA.idx < stiB.idx; + + } else { + // fields differ: + String fieldA = fieldInfos.fieldName(stiA.fieldNumber); + String fieldB = fieldInfos.fieldName(stiB.fieldNumber); + return fieldA.compareTo(fieldB) < 0; + } + } + } + + final static int MAX_RAM_CELL_LEVEL = 4; + RAMCell freeCells[] = new RAMCell[1+MAX_RAM_CELL_LEVEL]; + + synchronized void recycle(RAMCell cell) { + cell.next = freeCells[cell.level]; + freeCells[cell.level] = cell; + } + + public RAMCell alloc(final int level, final int subLevel) { + RAMCell r; + synchronized(this) { + r = freeCells[level]; + if (r != null) + freeCells[level] = r.next; + else + r = null; + } + + if (r == null) + r = new RAMCell(level, subLevel); + else { + r.next = null; + r.subLevel = (byte) subLevel; + } + return r; + } + + private static final class RAMCell { + + byte[] buffer; + RAMCell next; + byte level; + byte subLevel; + + public RAMCell(final int level, final int subLevel) { + this.level = (byte) level; + this.subLevel = (byte) subLevel; + int size = 0; + switch(this.level) { + case 0: + size = 64; + break; + case 1: + size = 256; + break; + case 2: + size = 1024; + break; + case 3: + size = 4096; + break; + case 4: + size = 16384; + break; + } + buffer = new byte[size]; + } + } + + private final class RAMWriter extends IndexOutput { + + RAMCell head; + RAMCell tail; + int upto; + int limit; + byte[] buffer; + long size; + + boolean isFree = false; + + void setStartLevel(int level) { + assert head == null; + head = tail = alloc(level, 0); + buffer = head.buffer; + upto = 0; + limit = head.buffer.length; + size = limit; + } + + public void writeByte(byte b) { + assert !isFree; + if (upto == limit) + nextBuffer(); + buffer[upto++] = b; + } + + // Move all of our bytes to out and reset + public void writeTo(IndexOutput out) throws IOException { + assert !isFree; + while(head != null) { + final int numBytes; + if (head.next == null) + numBytes = upto; + else + numBytes = head.buffer.length; + // System.out.println("writeTo: " + numBytes); + out.writeBytes(head.buffer, numBytes); + RAMCell next = head.next; + recycle(head); + head = next; + } + reset(); + } + + private void reset() { + assert !isFree; + head = tail = null; + buffer = null; + limit = upto = 0; + size = 0; + } + + private void free() { + assert !isFree; + while(head != null) { + RAMCell c = head.next; + recycle(head); + head = c; + } + reset(); + } + + public void writeBytes(byte[] b, int offset, int numBytes) { + assert !isFree; + assert numBytes > 0; + switch(numBytes) { + case 4: + writeByte(b[offset++]); + case 3: + writeByte(b[offset++]); + case 2: + writeByte(b[offset++]); + case 1: + writeByte(b[offset++]); + break; + default: + if (upto == limit) + nextBuffer(); + // System.out.println(" writeBytes: buffer=" + buffer + " head=" + head + " offset=" + offset + " nB=" + numBytes); + while(true) { + int chunk = limit - upto; + if (chunk >= numBytes) { + System.arraycopy(b, offset, buffer, upto, numBytes); + upto += numBytes; + break; + } else { + System.arraycopy(b, offset, buffer, upto, chunk); + offset += chunk; + numBytes -= chunk; + nextBuffer(); + } + } + } + } + + public void nextBuffer() { + assert !isFree; + + final int level; + final int subLevel; + if (tail == null) { + level = 0; + subLevel = 0; + } else if (tail.level < MAX_RAM_CELL_LEVEL) { + if (3 == tail.subLevel) { + level = 1+tail.level; + subLevel = 0; + } else { + level = tail.level; + subLevel = 1+tail.subLevel; + } + } else { + subLevel = 0; + level = MAX_RAM_CELL_LEVEL; + } + + RAMCell c = alloc(level, subLevel); + + if (head == null) + head = tail = c; + else { + tail.next = c; + tail = c; + } + + limit = c.buffer.length; + size += limit; + buffer = c.buffer; + + upto = 0; + } + + public long getFilePointer() { + assert !isFree; + return size - (limit-upto); + } + + public long length() { + assert !isFree; + return getFilePointer(); + } + + public void close() {} + public void flush() {throw new RuntimeException("not implemented");} + public void seek(long pos) {throw new RuntimeException("not implemented");} + } + + // Limited IndexInput for "read once". This frees each + // buffer from the head once it's been read. + private final class RAMReader extends IndexInput { + + int readLimit; + int upto; + int limit; + RAMCell head; + byte[] buffer; + + // ASSERT + boolean finished = true; + + RAMReader(RAMCell head, int limit) { + reset(head, limit); + } + + public void reset(RAMCell head, int limit) { + // Make sure we were fully read + assert finished; + finished = false; + readLimit = limit; + this.head = head; + upto = 0; + if (head == null) { + assert 0 == readLimit; + buffer = null; + } else { + buffer = head.buffer; + if (head.next == null) { + this.limit = readLimit; + assert this.limit > 0 && this.limit <= buffer.length; + } else + this.limit = buffer.length; + } + } + + public byte readByte() { + byte b = buffer[upto++]; + if (upto == limit) + nextBuffer(); + return b; + } + + public void nextBuffer() { + RAMCell c = head.next; + recycle(head); + head = c; + upto = 0; + if (head != null) { + buffer = head.buffer; + if (head.next == null) { + limit = readLimit; + assert limit > 0 && limit <= buffer.length; + } else + limit = buffer.length; + } else { + // ASSERT + finished = true; + buffer = null; + } + } + + public void readBytes(byte[] b, int offset, int len) {throw new RuntimeException("not implemented");} + public void close() {} + public long getFilePointer() {throw new RuntimeException("not implemented");} + public void seek(long pos) {throw new RuntimeException("not implemented");} + public long length() {throw new RuntimeException("not implemented");} + } + + static final byte defaultNorm = Similarity.encodeNorm(1.0f); + + private class BufferedNorms { + + RAMWriter out = new RAMWriter(); + int upto; + + void add(float norm) { + byte b = Similarity.encodeNorm(norm); + out.writeByte(b); + upto++; + } + + void fill(int docID) { + // System.out.println(" now fill: " + upto + " vs " + docID); + while(upto < docID) { + // fill in docs that didn't have this field: + out.writeByte(defaultNorm); + upto++; + } + } + } + + static long[] realloc(long[] array, int newSize) { + long[] newArray = new long[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static int[] realloc(int[] array, int newSize) { + int[] newArray = new int[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static float[] realloc(float[] array, int newSize) { + float[] newArray = new float[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } +} Property changes on: src/java/org/apache/lucene/index/MultiDocumentWriter.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/SegmentMergeInfo.java =================================================================== --- src/java/org/apache/lucene/index/SegmentMergeInfo.java (revision 523296) +++ src/java/org/apache/lucene/index/SegmentMergeInfo.java (working copy) @@ -73,9 +73,8 @@ final void close() throws IOException { termEnum.close(); - if (postings != null) { - postings.close(); + if (postings != null) + postings.close(); } } -} Index: src/java/org/apache/lucene/store/IndexOutput.java =================================================================== --- src/java/org/apache/lucene/store/IndexOutput.java (revision 523296) +++ src/java/org/apache/lucene/store/IndexOutput.java (working copy) @@ -125,6 +125,24 @@ } } + public void writeChars(char[] s, int start, int length) + throws IOException { + final int end = start + length; + for (int i = start; i < end; i++) { + final int code = (int)s[i]; + if (code >= 0x01 && code <= 0x7F) + writeByte((byte)code); + else if (((code >= 0x80) && (code <= 0x7FF)) || code == 0) { + writeByte((byte)(0xC0 | (code >> 6))); + writeByte((byte)(0x80 | (code & 0x3F))); + } else { + writeByte((byte)(0xE0 | (code >>> 12))); + writeByte((byte)(0x80 | ((code >> 6) & 0x3F))); + writeByte((byte)(0x80 | (code & 0x3F))); + } + } + } + /** Forces any buffered output to be written. */ public abstract void flush() throws IOException; Index: src/demo/org/apache/lucene/demo/IndexLineFiles.java =================================================================== --- src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) +++ src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) @@ -0,0 +1,190 @@ +package org.apache.lucene.demo; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.document.DateTools; + +import java.io.File; +import java.io.FileReader; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Date; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Index all text files under a directory. */ +public class IndexLineFiles { + + private IndexLineFiles() {} + + static final File INDEX_DIR = new File("index"); + + static final AtomicInteger allCount = new AtomicInteger(); + + static int bufferSize; + static String fileName; + + private static class Indexer extends Thread { + + public void run() { + int iter = 0; + Document doc = new Document(); + while (true) { + + try { + BufferedReader input = new BufferedReader(new FileReader(fileName)); + String line = null; + + try { + while ((line = input.readLine()) != null) { + + if (doStoredFields && 0 == iter) { + // Add the path of the file as a field named "path". Use a field that is + // indexed (i.e. searchable), but don't tokenize the field into words. + doc.add(new Field("path", fileName, Field.Store.YES, Field.Index.NO)); + + // Add the last modified date of the file a field named "modified". Use + // a field that is indexed (i.e. searchable), but don't tokenize the field + // into words. + doc.add(new Field("modified", + "200703161637", + Field.Store.YES, Field.Index.NO)); + } + + doc.add(new Field("contents", line, Field.Store.NO, Field.Index.TOKENIZED, tvMode)); + + if (++iter == mult) { + writer.addDocument(doc); + doc.getFields().clear(); + iter = 0; + if (allCount.getAndIncrement() >= (numDoc-1)) { + System.out.println("THREAD DONE"); + return; + } + } + } + } finally { + input.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + static Field.TermVector tvMode; + static boolean doStoredFields; + static int mult; + static IndexWriter writer; + static int numDoc; + + /** Index all text files under a directory. */ + public static void main(String[] args) throws IOException { + String usage = "java org.apache.lucene.demo.IndexFiles "; + + if (args.length == 0) { + System.err.println("Usage: " + usage); + System.exit(1); + } + + fileName = args[0]; + boolean autoCommit = args[1].equals("yes"); + bufferSize = Integer.parseInt(args[2]); + numDoc = Integer.parseInt(args[3]); + int maxBufferedDocs = Integer.parseInt(args[4]); + boolean optimize = args[5].equals("yes"); + + if (args[6].equals("no")) + tvMode = Field.TermVector.NO; + else if (args[6].equals("yes")) + tvMode = Field.TermVector.YES; + else if (args[6].equals("pos")) + tvMode = Field.TermVector.WITH_POSITIONS; + else if (args[6].equals("posoffs")) + tvMode = Field.TermVector.WITH_POSITIONS_OFFSETS; + else + throw new RuntimeException("bad term vector mode: " + args[6]); + + doStoredFields = args[7].equals("yes"); + mult = Integer.parseInt(args[8]); + int numThread = Integer.parseInt(args[9]); + + System.out.println("\nFAST: autoCommit=" + autoCommit + " bufferSize=" + bufferSize + "MB docLimit=" + numDoc + " optimize=" + optimize + " termVectors=" + args[6] + " storedFields=" + doStoredFields + " multiplier=" + mult + " numThread=" + numThread); + System.out.println(" NO MERGING"); + + if (INDEX_DIR.exists()) { + System.out.println("Cannot save index to '" +INDEX_DIR+ "' directory, please delete it first"); + System.exit(1); + } + + Date start = new Date(); + try { + // IndexWriter writer = new IndexWriter(INDEX_DIR, new StandardAnalyzer(), true); + writer = new IndexWriter(FSDirectory.getDirectory(INDEX_DIR), autoCommit, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(maxBufferedDocs); + writer.setMaxFieldLength(100000000); + writer.setRAMBufferSizeMB(bufferSize); + writer.setUseCompoundFile(false); + //writer.setInfoStream(System.out); + writer.setMergeFactor(100000); + // writer.setMaxFieldLength(10000000); + //writer.setMaxFieldLength(1000); + + Indexer[] indexers = new Indexer[numThread]; + for(int i=0;i