Index: src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (revision 521310) +++ src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (working copy) @@ -40,7 +40,7 @@ for (int i = 0; i < 100; i++) { addDoc(writer); checkInvariants(writer); - if (writer.getRamSegmentCount() + writer.getSegmentCount() >= 18) { + if (writer.getSegmentCount() + writer.getSegmentCount() >= 18) { noOverMerge = true; } } @@ -178,7 +178,7 @@ int mergeFactor = writer.getMergeFactor(); int maxMergeDocs = writer.getMaxMergeDocs(); - int ramSegmentCount = writer.getRamSegmentCount(); + int ramSegmentCount = writer.getNumBufferedDocuments(); assertTrue(ramSegmentCount < maxBufferedDocs); int lowerBound = -1; Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 521173) +++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -93,7 +93,7 @@ } modifier.flush(); - assertEquals(0, modifier.getRamSegmentCount()); + assertEquals(0, modifier.getNumBufferedDocuments()); assertTrue(0 < modifier.getSegmentCount()); if (!autoCommit) { @@ -435,7 +435,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexReader.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexReader.java (revision 521173) +++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy) @@ -803,7 +803,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 521173) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -459,7 +459,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); @@ -840,6 +840,7 @@ public void testCommitOnCloseAbort() throws IOException { Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for (int i = 0; i < 14; i++) { addDoc(writer); } @@ -852,6 +853,7 @@ searcher.close(); writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDoc(writer); } @@ -876,6 +878,7 @@ // Now make sure we can re-open the index, add docs, // and all is good: writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int i=0;i<12;i++) { for(int j=0;j<17;j++) { addDoc(writer); @@ -943,6 +946,7 @@ public void testCommitOnCloseOptimize() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDocWithIndex(writer, j); } Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 521173) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -74,8 +74,6 @@ count++; } - modifier.close(); - } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); @@ -125,6 +123,9 @@ IndexerThread indexerThread = new IndexerThread(modifier); indexerThread.start(); + IndexerThread indexerThread2 = new IndexerThread(modifier); + indexerThread2.start(); + // Two searchers that constantly just re-instantiate the searcher: SearcherThread searcherThread1 = new SearcherThread(directory); searcherThread1.start(); @@ -133,9 +134,14 @@ searcherThread2.start(); indexerThread.join(); + indexerThread2.join(); searcherThread1.join(); searcherThread2.join(); + + modifier.close(); + assertTrue("hit unexpected exception in indexer", !indexerThread.failed); + assertTrue("hit unexpected exception in indexer 2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); //System.out.println(" Writer: " + indexerThread.count + " iterations"); Index: src/test/org/apache/lucene/index/TestIndexFileDeleter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexFileDeleter.java (revision 521173) +++ src/test/org/apache/lucene/index/TestIndexFileDeleter.java (working copy) @@ -34,6 +34,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); int i; for(i=0;i<35;i++) { addDoc(writer, i); Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 521173) +++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy) @@ -254,6 +254,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -271,7 +272,7 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } // Simplistic check: just verify all segments_N's still @@ -316,6 +317,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -333,13 +335,15 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } - // Simplistic check: just verify the index is in fact - // readable: - IndexReader reader = IndexReader.open(dir); - reader.close(); + if (autoCommit) { + // Simplistic check: just verify the index is in fact + // readable: + IndexReader reader = IndexReader.open(dir); + reader.close(); + } dir.close(); } @@ -363,6 +367,7 @@ for(int j=0;j>> 1; // shift off low bit if ((docCode & 1) != 0) // if low bit is set freq = 1; // freq is one else freq = freqStream.readVInt(); // else read freq count++; + // //System.out.println(" read freq " + freq); if (deletedDocs == null || !deletedDocs.get(doc)) { + //System.out.println(" add " + doc + "; freq=" + freq); docs[i] = doc; freqs[i] = freq; ++i; @@ -156,7 +166,9 @@ /** Optimized implementation. */ public boolean skipTo(int target) throws IOException { + //System.out.println("std skip to " + target); if (df >= skipInterval) { // optimized case + //System.out.println(" is frequent enough"); if (skipStream == null) skipStream = (IndexInput) freqStream.clone(); // lazily clone @@ -172,6 +184,7 @@ long lastFreqPointer = freqStream.getFilePointer(); long lastProxPointer = -1; int numSkipped = -1 - (count % skipInterval); + //System.out.println(" target " + target + "; skipDoc " + skipDoc); while (target > skipDoc) { lastSkipDoc = skipDoc; @@ -203,11 +216,13 @@ freqPointer += skipStream.readVInt(); proxPointer += skipStream.readVInt(); + //System.out.println(" now freq " + freqPointer + " prox " + proxPointer); skipCount++; } // if we found something to skip, then skip it if (lastFreqPointer > freqStream.getFilePointer()) { + //System.out.println(" do skip! " + lastFreqPointer); freqStream.seek(lastFreqPointer); skipProx(lastProxPointer, lastPayloadLength); @@ -219,6 +234,7 @@ // done skipping, now just scan do { + //System.out.println(" now scan " + target + " " + doc); if (!next()) return false; } while (target > doc); Index: src/java/org/apache/lucene/index/FieldsWriter.java =================================================================== --- src/java/org/apache/lucene/index/FieldsWriter.java (revision 521173) +++ src/java/org/apache/lucene/index/FieldsWriter.java (working copy) @@ -38,17 +38,90 @@ private IndexOutput indexStream; + private boolean doClose; + FieldsWriter(Directory d, String segment, FieldInfos fn) throws IOException { fieldInfos = fn; fieldsStream = d.createOutput(segment + ".fdt"); indexStream = d.createOutput(segment + ".fdx"); + doClose = true; } + FieldsWriter(IndexOutput fdx, IndexOutput fdt, FieldInfos fn) throws IOException { + fieldInfos = fn; + fieldsStream = fdt; + indexStream = fdx; + doClose = false; + } + IndexOutput getIndexStream() { + return indexStream; + } + IndexOutput getFieldsStream() { + return fieldsStream; + } + final void close() throws IOException { + if (doClose) { fieldsStream.close(); indexStream.close(); + } } + final void writeField(FieldInfo fi, Fieldable field) throws IOException { + // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode + // and field.binaryValue() already returns the compressed value for a field + // with isCompressed()==true, so we disable compression in that case + boolean disableCompression = (field instanceof FieldsReader.FieldForMerge); + fieldsStream.writeVInt(fi.number); + // System.out.println(" write field number " + fieldInfos.fieldNumber(field.name()) + " name " + field.name() + " to " + fieldsStream + " at " + fieldsStream.getFilePointer()); + byte bits = 0; + if (field.isTokenized()) + bits |= FieldsWriter.FIELD_IS_TOKENIZED; + if (field.isBinary()) + bits |= FieldsWriter.FIELD_IS_BINARY; + if (field.isCompressed()) + bits |= FieldsWriter.FIELD_IS_COMPRESSED; + + fieldsStream.writeByte(bits); + + if (field.isCompressed()) { + // compression is enabled for the current field + byte[] data = null; + + if (disableCompression) { + // optimized case for merging, the data + // is already compressed + data = field.binaryValue(); + } else { + // check if it is a binary field + if (field.isBinary()) { + data = compress(field.binaryValue()); + } + else { + data = compress(field.stringValue().getBytes("UTF-8")); + } + } + final int len = data.length; + // System.out.println(" compressed: " + len); + fieldsStream.writeVInt(len); + fieldsStream.writeBytes(data, len); + } + else { + // compression is disabled for the current field + if (field.isBinary()) { + byte[] data = field.binaryValue(); + final int len = data.length; + // System.out.println(" not compressed: " + len); + fieldsStream.writeVInt(len); + fieldsStream.writeBytes(data, len); + } + else { + fieldsStream.writeString(field.stringValue()); + } + } + // System.out.println(" fieldsStream now at " + fieldsStream.getFilePointer()); + } + final void addDocument(Document doc) throws IOException { indexStream.writeLong(fieldsStream.getFilePointer()); @@ -59,62 +132,14 @@ if (field.isStored()) storedCount++; } + // System.out.println("write " + storedCount + " fields to " + fieldsStream + " at " + fieldsStream.getFilePointer()); fieldsStream.writeVInt(storedCount); fieldIterator = doc.getFields().iterator(); while (fieldIterator.hasNext()) { Fieldable field = (Fieldable) fieldIterator.next(); - // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode - // and field.binaryValue() already returns the compressed value for a field - // with isCompressed()==true, so we disable compression in that case - boolean disableCompression = (field instanceof FieldsReader.FieldForMerge); - if (field.isStored()) { - fieldsStream.writeVInt(fieldInfos.fieldNumber(field.name())); - - byte bits = 0; - if (field.isTokenized()) - bits |= FieldsWriter.FIELD_IS_TOKENIZED; - if (field.isBinary()) - bits |= FieldsWriter.FIELD_IS_BINARY; - if (field.isCompressed()) - bits |= FieldsWriter.FIELD_IS_COMPRESSED; - - fieldsStream.writeByte(bits); - - if (field.isCompressed()) { - // compression is enabled for the current field - byte[] data = null; - - if (disableCompression) { - // optimized case for merging, the data - // is already compressed - data = field.binaryValue(); - } else { - // check if it is a binary field - if (field.isBinary()) { - data = compress(field.binaryValue()); - } - else { - data = compress(field.stringValue().getBytes("UTF-8")); - } - } - final int len = data.length; - fieldsStream.writeVInt(len); - fieldsStream.writeBytes(data, len); - } - else { - // compression is disabled for the current field - if (field.isBinary()) { - byte[] data = field.binaryValue(); - final int len = data.length; - fieldsStream.writeVInt(len); - fieldsStream.writeBytes(data, len); - } - else { - fieldsStream.writeString(field.stringValue()); - } - } - } + if (field.isStored()) + writeField(fieldInfos.fieldInfo(field.name()), field); } } Index: src/java/org/apache/lucene/index/TermInfosWriter.java =================================================================== --- src/java/org/apache/lucene/index/TermInfosWriter.java (revision 521173) +++ src/java/org/apache/lucene/index/TermInfosWriter.java (working copy) @@ -92,6 +92,7 @@ TermInfo pointers must be positive and greater than all previous.*/ final void add(Term term, TermInfo ti) throws CorruptIndexException, IOException { + if (!isIndex && term.compareTo(lastTerm) <= 0) throw new CorruptIndexException("term out of order (\"" + term + "\".compareTo(\"" + lastTerm + "\") <= 0)"); Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 521173) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -171,11 +171,16 @@ public final static int DEFAULT_MERGE_FACTOR = 10; /** - * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}. + * Default value is 0. Change using {@link #setMaxBufferedDocs(int)}. */ - public final static int DEFAULT_MAX_BUFFERED_DOCS = 10; + public final static int DEFAULT_MAX_BUFFERED_DOCS = 0; /** + * Default value is 16 MB. Change using {@link #setRAMBufferSizeMB}. + */ + public final static float DEFAULT_RAM_BUFFER_SIZE_MB = 16F; + + /** * Default value is 1000. Change using {@link #setMaxBufferedDeleteTerms(int)}. */ public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; @@ -208,9 +213,8 @@ private boolean autoCommit = true; // false if we should commit only on close SegmentInfos segmentInfos = new SegmentInfos(); // the segments - SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory - private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs - private IndexFileDeleter deleter; + MultiDocumentWriter docWriter; + IndexFileDeleter deleter; private Lock writeLock; @@ -602,11 +606,14 @@ rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); } + docWriter = new MultiDocumentWriter(newSegmentName(), directory, this, !autoCommit); + docWriter.setInfoStream(infoStream); + // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: deleter = new IndexFileDeleter(directory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, - segmentInfos, infoStream); + segmentInfos, infoStream, docWriter); } catch (IOException e) { this.writeLock.release(); @@ -672,19 +679,34 @@ */ public void setMaxBufferedDocs(int maxBufferedDocs) { ensureOpen(); - if (maxBufferedDocs < 2) - throw new IllegalArgumentException("maxBufferedDocs must at least be 2"); + if (maxBufferedDocs != 0 && maxBufferedDocs < 2) + throw new IllegalArgumentException("maxBufferedDocs must at least be 2 or 0 to disable"); this.minMergeDocs = maxBufferedDocs; } /** * @see #setMaxBufferedDocs + * @deprecated */ public int getMaxBufferedDocs() { ensureOpen(); return minMergeDocs; } + // nocommit javadoc + public void setRAMBufferSizeMB(float mb) { + if (mb < 1) + throw new IllegalArgumentException("ramBufferSize must at least be 1 MB"); + ramBufferSize = mb*1024F*1024F; + if (!autoCommit) + docWriter.setRAMBufferSizeMB(mb); + } + + // nocommit javadoc + public float getRAMBufferSizeMB() { + return ramBufferSize/1024F/1024F; + } + /** *

Determines the minimal number of delete terms required before the buffered * in-memory delete terms are applied and flushed. If there are documents @@ -756,6 +778,7 @@ public void setInfoStream(PrintStream infoStream) { ensureOpen(); this.infoStream = infoStream; + docWriter.setInfoStream(infoStream); deleter.setInfoStream(infoStream); } @@ -835,7 +858,7 @@ */ public synchronized void close() throws CorruptIndexException, IOException { if (!closed) { - flushRamSegments(); + flush(); if (commitPending) { segmentInfos.write(directory); // now commit changes @@ -844,7 +867,6 @@ rollbackSegmentInfos = null; } - ramDirectory.close(); if (writeLock != null) { writeLock.release(); // release write lock writeLock = null; @@ -884,7 +906,7 @@ /** Returns the number of documents currently in this index. */ public synchronized int docCount() { ensureOpen(); - int count = ramSegmentInfos.size(); + int count = docWriter.docID; for (int i = 0; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); count += si.docCount; @@ -962,24 +984,13 @@ */ public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); synchronized (this) { - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); + // nocommit -- move this out of sync + docWriter.addDocument(doc, analyzer); + maybeFlush(); } } - SegmentInfo buildSingleDocSegment(Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - DocumentWriter dw = new DocumentWriter(ramDirectory, analyzer, this); - dw.setInfoStream(infoStream); - String segmentName = newRamSegmentName(); - dw.addDocument(segmentName, doc); - SegmentInfo si = new SegmentInfo(segmentName, 1, ramDirectory, false, false); - si.setNumFields(dw.getNumFields()); - return si; - } - /** * Deletes the document(s) containing term. * @param term the term to identify the documents to be deleted @@ -989,7 +1000,7 @@ public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); bufferDeleteTerm(term); - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1005,7 +1016,7 @@ for (int i = 0; i < terms.length; i++) { bufferDeleteTerm(terms[i]); } - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1041,26 +1052,23 @@ public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); + // nocommit: should this be in sync? + bufferDeleteTerm(term); synchronized (this) { - bufferDeleteTerm(term); - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); + // nocommit move out of sync + docWriter.addDocument(doc, analyzer); + maybeFlush(); } } - final synchronized String newRamSegmentName() { - return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX); - } - // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); } // for test purpose - final synchronized int getRamSegmentCount(){ - return ramSegmentInfos.size(); + final synchronized int getNumBufferedDocuments(){ + return docWriter.docID; } // for test purpose @@ -1089,6 +1097,7 @@ */ private int mergeFactor = DEFAULT_MERGE_FACTOR; + // nocommit fix javadocs /** Determines the minimal number of documents required before the buffered * in-memory documents are merging and a new Segment is created. * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory}, @@ -1096,10 +1105,12 @@ * the number of files open in a FSDirectory. * *

The default value is {@link #DEFAULT_MAX_BUFFERED_DOCS}. - + * @deprecated */ private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS; + // nocommit javadoc + private float ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; /** Determines the largest number of documents ever merged by addDocument(). * Small values (e.g., less than 10,000) are best for interactive indexing, @@ -1177,7 +1188,7 @@ */ public synchronized void optimize() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + flush(); while (segmentInfos.size() > 1 || (segmentInfos.size() == 1 && (SegmentReader.hasDeletions(segmentInfos.info(0)) || @@ -1186,7 +1197,7 @@ (useCompoundFile && (!SegmentReader.usesCompoundFile(segmentInfos.info(0))))))) { int minSegment = segmentInfos.size() - mergeFactor; - mergeSegments(segmentInfos, minSegment < 0 ? 0 : minSegment, segmentInfos.size()); + mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size()); } } @@ -1203,7 +1214,7 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; if (localAutoCommit) { - flushRamSegments(); + flush(); // Turn off auto-commit during our local transaction: autoCommit = false; } @@ -1280,16 +1291,18 @@ segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); + docWriter.abort(); + // Ask deleter to locate unreferenced files & remove // them: deleter.checkpoint(segmentInfos, false); deleter.refresh(); - ramSegmentInfos = new SegmentInfos(); bufferedDeleteTerms.clear(); numBufferedDeleteTerms = 0; commitPending = false; + docWriter.abort(); close(); } else { @@ -1384,7 +1397,7 @@ for (int base = start; base < segmentInfos.size(); base++) { int end = Math.min(segmentInfos.size(), base+mergeFactor); if (end-base > 1) { - mergeSegments(segmentInfos, base, end); + mergeSegments(base, end); } } } @@ -1424,7 +1437,7 @@ // segments in S may not since they could come from multiple indexes. // Here is the merge algorithm for addIndexesNoOptimize(): // - // 1 Flush ram segments. + // 1 Flush ram. // 2 Consider a combined sequence with segments from T followed // by segments from S (same as current addIndexes(Directory[])). // 3 Assume the highest level for segments in S is h. Call @@ -1445,14 +1458,18 @@ // copy a segment, which may cause doc count to change because deleted // docs are garbage collected. - // 1 flush ram segments + // 1 flush ram ensureOpen(); - flushRamSegments(); + flush(); // 2 copy segment infos and find the highest level from dirs int startUpperBound = minMergeDocs; + // nocommit: what to do? + if (startUpperBound == 0) + startUpperBound = 10; + boolean success = false; startTransaction(); @@ -1511,7 +1528,7 @@ // copy those segments from S for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { - mergeSegments(segmentInfos, i, i + 1); + mergeSegments(i, i + 1); } if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { success = true; @@ -1520,7 +1537,7 @@ } // invariants do not hold, simply merge those segments - mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount); + mergeSegments(segmentCount - numTailSegments, segmentCount); // maybe merge segments again if necessary if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { @@ -1660,22 +1677,17 @@ throws IOException { } - protected final void maybeFlushRamSegments() throws CorruptIndexException, IOException { + protected final void maybeFlush() throws CorruptIndexException, IOException { // A flush is triggered if enough new documents are buffered or - // if enough delete terms are buffered - if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) { - flushRamSegments(); + // if enough delete terms are buffered or enough RAM is + // being consumed + if (numBufferedDeleteTerms >= maxBufferedDeleteTerms || + (autoCommit && ((minMergeDocs != 0 && docWriter.docID >= minMergeDocs) || + (autoCommit && docWriter.getRAMUsed() > ramBufferSize)))) { + flush(); } } - /** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */ - private final synchronized void flushRamSegments() throws CorruptIndexException, IOException { - if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) { - mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size()); - maybeMergeSegments(minMergeDocs); - } - } - /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -1686,7 +1698,89 @@ */ public final synchronized void flush() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + + SegmentInfo newSegment = null; + boolean anything = false; + + boolean flushDocs = docWriter.docID > 0; + boolean flushDeletes = bufferedDeleteTerms.size() > 0; + final int numDocs = docWriter.docID; + + if (flushDocs || flushDeletes) { + + SegmentInfos rollback = null; + + if (flushDeletes) + rollback = (SegmentInfos) segmentInfos.clone(); + + boolean success = false; + + try { + if (flushDocs) { + int mergedDocCount = docWriter.docID; + docWriter.flush(); + newSegment = new SegmentInfo(docWriter.segment, + mergedDocCount, + directory, false, true); + segmentInfos.addElement(newSegment); + } + + if (flushDeletes) { + maybeApplyDeletes(flushDocs); + doAfterFlush(); + } + + checkpoint(); + success = true; + } finally { + if (!success) { + if (flushDeletes) { + // Fully replace the segmentInfos since flushed + // deletes could have changed any of the + // SegmentInfo instances: + segmentInfos.clear(); + segmentInfos.addAll(rollback); + } else { + // Remove segment we added, if any: + if (newSegment != null && + segmentInfos.size() > 0 && + segmentInfos.info(segmentInfos.size()-1) == newSegment) + segmentInfos.remove(segmentInfos.size()-1); + docWriter.abort(); + } + deleter.checkpoint(segmentInfos, false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + + if (flushDocs && useCompoundFile) { + success = false; + try { + docWriter.createCompoundFile(newSegment.name); + newSegment.setUseCompoundFile(true); + checkpoint(); + success = true; + } finally { + if (!success) { + newSegment.setUseCompoundFile(false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + } + + // nocommit + // maybeMergeSegments(mergeFactor * numDocs / 2); + + maybeMergeSegments(minMergeDocs); + + // Set up for next segment if we flushed any docs: + if (flushDocs) + docWriter.reset(newSegmentName()); + } } /** Expert: Return the total size of all index files currently cached in memory. @@ -1694,15 +1788,15 @@ */ public final long ramSizeInBytes() { ensureOpen(); - return ramDirectory.sizeInBytes(); + return docWriter.getRAMUsed(); } /** Expert: Return the number of documents whose segments are currently cached in memory. - * Useful when calling flushRamSegments() + * Useful when calling flush() */ public final synchronized int numRamDocs() { ensureOpen(); - return ramSegmentInfos.size(); + return docWriter.docID; } /** Incremental segment merger. */ @@ -1710,6 +1804,9 @@ long lowerBound = -1; long upperBound = startUpperBound; + // nocommit + if (upperBound == 0) upperBound = 10; + while (upperBound < maxMergeDocs) { int minSegment = segmentInfos.size(); int maxSegment = -1; @@ -1741,7 +1838,7 @@ while (numSegments >= mergeFactor) { // merge the leftmost* mergeFactor segments - int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor); + int docCount = mergeSegments(minSegment, minSegment + mergeFactor); numSegments -= mergeFactor; if (docCount > upperBound) { @@ -1770,39 +1867,29 @@ * Merges the named range of segments, replacing them in the stack with a * single segment. */ - private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end) + private final int mergeSegments(int minSegment, int end) throws CorruptIndexException, IOException { - // We may be called solely because there are deletes - // pending, in which case doMerge is false: - boolean doMerge = end > 0; final String mergedName = newSegmentName(); + SegmentMerger merger = null; - - final List ramSegmentsToDelete = new ArrayList(); - SegmentInfo newSegment = null; int mergedDocCount = 0; - boolean anyDeletes = (bufferedDeleteTerms.size() != 0); // This is try/finally to make sure merger's readers are closed: try { - if (doMerge) { - if (infoStream != null) infoStream.print("merging segments"); - merger = new SegmentMerger(this, mergedName); + if (infoStream != null) infoStream.print("merging segments"); - for (int i = minSegment; i < end; i++) { - SegmentInfo si = sourceSegments.info(i); - if (infoStream != null) - infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); - IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) - merger.add(reader); - if (reader.directory() == this.ramDirectory) { - ramSegmentsToDelete.add(si); - } - } + merger = new SegmentMerger(this, mergedName); + + for (int i = minSegment; i < end; i++) { + SegmentInfo si = segmentInfos.info(i); + if (infoStream != null) + infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); + IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) + merger.add(reader); } SegmentInfos rollback = null; @@ -1812,99 +1899,57 @@ // if we hit exception when doing the merge: try { - if (doMerge) { - mergedDocCount = merger.merge(); + mergedDocCount = merger.merge(); - if (infoStream != null) { - infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); - } + if (infoStream != null) { + infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); + } - newSegment = new SegmentInfo(mergedName, mergedDocCount, - directory, false, true); - } + newSegment = new SegmentInfo(mergedName, mergedDocCount, + directory, false, true); - if (sourceSegments != ramSegmentInfos || anyDeletes) { - // Now save the SegmentInfo instances that - // we are replacing: - rollback = (SegmentInfos) segmentInfos.clone(); - } + rollback = (SegmentInfos) segmentInfos.clone(); - if (doMerge) { - if (sourceSegments == ramSegmentInfos) { - segmentInfos.addElement(newSegment); - } else { - for (int i = end-1; i > minSegment; i--) // remove old infos & add new - sourceSegments.remove(i); + for (int i = end-1; i > minSegment; i--) // remove old infos & add new + segmentInfos.remove(i); - segmentInfos.set(minSegment, newSegment); - } - } + segmentInfos.set(minSegment, newSegment); - if (sourceSegments == ramSegmentInfos) { - maybeApplyDeletes(doMerge); - doAfterFlush(); - } - checkpoint(); success = true; } finally { - if (success) { - // The non-ram-segments case is already committed - // (above), so all the remains for ram segments case - // is to clear the ram segments: - if (sourceSegments == ramSegmentInfos) { - ramSegmentInfos.removeAllElements(); - } - } else { + if (!success && rollback != null) { + // Rollback the individual SegmentInfo + // instances, but keep original SegmentInfos + // instance (so we don't try to write again the + // same segments_N file -- write once): + segmentInfos.clear(); + segmentInfos.addAll(rollback); - // Must rollback so our state matches index: - if (sourceSegments == ramSegmentInfos && !anyDeletes) { - // Simple case: newSegment may or may not have - // been added to the end of our segment infos, - // so just check & remove if so: - if (newSegment != null && - segmentInfos.size() > 0 && - segmentInfos.info(segmentInfos.size()-1) == newSegment) { - segmentInfos.remove(segmentInfos.size()-1); - } - } else if (rollback != null) { - // Rollback the individual SegmentInfo - // instances, but keep original SegmentInfos - // instance (so we don't try to write again the - // same segments_N file -- write once): - segmentInfos.clear(); - segmentInfos.addAll(rollback); - } - // Delete any partially created and now unreferenced files: deleter.refresh(); } } } finally { // close readers before we attempt to delete now-obsolete segments - if (doMerge) merger.closeReaders(); + merger.closeReaders(); } - // Delete the RAM segments - deleter.deleteDirect(ramDirectory, ramSegmentsToDelete); - // Give deleter a chance to remove files now. deleter.checkpoint(segmentInfos, autoCommit); - if (useCompoundFile && doMerge) { + if (useCompoundFile) { boolean success = false; try { - merger.createCompoundFile(mergedName + ".cfs"); newSegment.setUseCompoundFile(true); checkpoint(); success = true; - } finally { if (!success) { // Must rollback: @@ -1923,14 +1968,14 @@ // Called during flush to apply any buffered deletes. If // doMerge is true then a new segment was just created and // flushed from the ram segments. - private final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException { + private final void maybeApplyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on " + segmentInfos.size() + " segments."); - if (doMerge) { + if (flushedNewSegment) { IndexReader reader = null; try { reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1)); @@ -1951,7 +1996,7 @@ } int infosEnd = segmentInfos.size(); - if (doMerge) { + if (flushedNewSegment) { infosEnd--; } @@ -1983,6 +2028,8 @@ private final boolean checkNonDecreasingLevels(int start) { int lowerBound = -1; int upperBound = minMergeDocs; + if (upperBound == 0) + upperBound = 10; for (int i = segmentInfos.size() - 1; i >= start; i--) { int docCount = segmentInfos.info(i).docCount; @@ -2031,10 +2078,11 @@ // well as the disk segments. private void bufferDeleteTerm(Term term) { Num num = (Num) bufferedDeleteTerms.get(term); + int numDoc = docWriter.docID; if (num == null) { - bufferedDeleteTerms.put(term, new Num(ramSegmentInfos.size())); + bufferedDeleteTerms.put(term, new Num(numDoc)); } else { - num.setNum(ramSegmentInfos.size()); + num.setNum(numDoc); } numBufferedDeleteTerms++; } @@ -2044,17 +2092,20 @@ // the documents buffered before it, not those buffered after it. private final void applyDeletesSelectively(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { + //System.out.println("now apply selective deletes"); Iterator iter = deleteTerms.entrySet().iterator(); while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); - + //System.out.println(" term " + term); + TermDocs docs = reader.termDocs(term); if (docs != null) { int num = ((Num) entry.getValue()).getNum(); try { while (docs.next()) { int doc = docs.doc(); + //System.out.println(" doc " + doc + " vs " + num); if (doc >= num) { break; } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 521173) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -97,6 +97,7 @@ private PrintStream infoStream; private Directory directory; private IndexDeletionPolicy policy; + private MultiDocumentWriter docWriter; void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; @@ -116,10 +117,12 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream) + public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream, MultiDocumentWriter docWriter) throws CorruptIndexException, IOException { + this.docWriter = docWriter; this.infoStream = infoStream; + this.policy = policy; this.directory = directory; @@ -310,6 +313,8 @@ // Incref the files: incRef(segmentInfos, isCommit); + if (docWriter != null) + incRef(docWriter.files()); if (isCommit) { // Append to our commits list: @@ -340,6 +345,8 @@ lastFiles.add(segmentInfo.files()); } } + if (docWriter != null) + lastFiles.add(docWriter.files()); } } Index: src/java/org/apache/lucene/index/MultiDocumentWriter.java =================================================================== --- src/java/org/apache/lucene/index/MultiDocumentWriter.java (revision 0) +++ src/java/org/apache/lucene/index/MultiDocumentWriter.java (revision 0) @@ -0,0 +1,2568 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.Token; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Fieldable; +import org.apache.lucene.search.Similarity; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RAMOutputStream; + +import org.apache.lucene.util.PriorityQueue; +import org.apache.lucene.util.StringHelper; + +import java.util.zip.Deflater; +import java.util.zip.Inflater; +import java.util.zip.DataFormatException; +import java.io.OutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Vector; +import java.util.Enumeration; +import java.util.Map; +import java.util.Iterator; +import java.text.NumberFormat; + +/** + * When a document is added, its stored fields (if any) and + * term vectors (if any) are immediately written to the + * Directory (ie these do not consume RAM). The terms + * dictionary and freq/prox posting lists are written to ram + * as a single RAMSegment. Periodically these RAMSegments + * are merged (because this compacts them), and eventually + * they are flushed to disk. When it's time to make a real + * segment, all RAMSegments and flushed segments are merged + * into the final postings lists. + */ + +final class MultiDocumentWriter { + + // Only applies when multiple threads may call + // addDocument: max number of pending documents in flight + // to be written to the real segment files. If we hit + // this max then new incoming addDocument calls will wait + // until the line shrinks below this. + public static int MAX_WAIT_QUEUE = 10; + + // How much ram we are allowed to use: + private static final long DEFAULT_RAM_BUFFER_SIZE = 32*1024*1024; + + private IndexWriter writer; + private Directory directory; // dir where final segment is written + + private FieldInfos fieldInfos; // all fields we've seen + + private IndexOutput tvx, tvf, tvd; // to write term vectors + + private FieldsWriter fieldsWriter; // to write stored fields + + private PrintStream infoStream; + String segment; // current segment we are writing + int docID; // next docID + int nextWriteDocID; // next docID to be written + + private List ramSegments = new ArrayList(); + private int[] levelCounts = new int[1]; + private long[] levelSizes = new long[1]; + private long totalSize; + + private List flushedSegments = new ArrayList(); + private int flushedCount; + private int[] flushedLevelCounts = new int[1]; + private long[] flushedLevelSizes = new long[1]; + private long totalFlushedSize; + + // need getter/setter + private int flushedMergeFactor = 10; + + private List files; + + private boolean hasNorms; + private boolean flushedVectors; + private boolean flushedNorms; + private boolean doSelfFlush; + + private long ramBufferSize = DEFAULT_RAM_BUFFER_SIZE; + + private List freeThreadStates = new ArrayList(); + private ThreadState[] waitingThreadStates = new ThreadState[1]; + + private int numWaiting = 0; + + void setRAMBufferSizeMB(float mb) { + ramBufferSize = (long) (mb*1024*1024); + } + + long startTime; + + MultiDocumentWriter(String segment, Directory directory, IndexWriter writer, boolean doSelfFlush) throws IOException { + this.directory = directory; + this.writer = writer; + this.doSelfFlush = doSelfFlush; + reset(segment); + startTime = System.currentTimeMillis(); + } + + // nocommit: needs synchronized? + List files() { + // TODO: must null out files when we make a change... + if (files != null) { + return files; + } + + files = new ArrayList(); + final int numFlushed = flushedSegments.size(); + for(int i=0;i= hi) + return; + + int mid = (lo + hi) / 2; + + if (postings[lo].compareTo(postings[mid]) > 0) { + Posting tmp = postings[lo]; + postings[lo] = postings[mid]; + postings[mid] = tmp; + } + + if (postings[mid].compareTo(postings[hi]) > 0) { + Posting tmp = postings[mid]; + postings[mid] = postings[hi]; + postings[hi] = tmp; + + if (postings[lo].compareTo(postings[mid]) > 0) { + Posting tmp2 = postings[lo]; + postings[lo] = postings[mid]; + postings[mid] = tmp2; + } + } + + int left = lo + 1; + int right = hi - 1; + + if (left >= right) + return; + + Posting partition = postings[mid]; + + for (; ;) { + while (postings[right].compareTo(partition) > 0) + --right; + + while (left < right && postings[left].compareTo(partition) <= 0) + ++left; + + if (left < right) { + Posting tmp = postings[left]; + postings[left] = postings[right]; + postings[right] = tmp; + --right; + } else { + break; + } + } + + quickSort(postings, lo, left); + quickSort(postings, left + 1, hi); + } + + private Payload payload; + private int numVectorFields; + private int fieldNumber; + private FieldInfo currentField; + private int currentFieldHashCode; + + // Tokenizes the fields of a document into Postings. + private final void invertDocument(Document doc, Analyzer analyzer) + throws IOException { + final int maxFieldLength = writer.getMaxFieldLength(); + + numVectorFields = 0; + numPostings = 0; + + for(int i=0;i0) position+=analyzer.getPositionIncrementGap(fieldName); + int offset = fieldOffsets[fieldNumber]; // offset field + + final boolean storeOffsets = field.isStoreOffsetWithTermVector(); + + if (field.isIndexed()) { + + if (field.isTermVectorStored() && 0 == length) + numVectorFields++; + + if (!field.isTokenized()) { // un-tokenized field + payload = null; + String stringValue = field.stringValue(); + if(field.isStoreOffsetWithTermVector()) + addPosition(stringValue, position++, offset, offset + stringValue.length()); + else + addPosition(stringValue, position++, -1, -1); + offset += stringValue.length(); + length++; + } else { + Reader reader; // find or make Reader + if (field.readerValue() != null) + reader = field.readerValue(); + else if (field.stringValue() != null) + reader = new StringReader(field.stringValue()); + else + throw new IllegalArgumentException + ("field must have either String or Reader value"); + + // Tokenize field and add to postingTable + TokenStream stream = analyzer.tokenStream(fieldName, reader); + try { + Token lastToken = null; + for (Token t = stream.next(); t != null; t = stream.next()) { + position += (t.getPositionIncrement() - 1); + payload = t.getPayload(); + + // TODO: factor this if out of this loop? + if (storeOffsets) + addPosition(t.termText(), position++, offset + t.startOffset(), offset + t.endOffset()); + else + addPosition(t.termText(), position++, -1, -1); + + lastToken = t; + if (++length >= maxFieldLength) { + if (infoStream != null) + infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); + break; + } + } + + if(lastToken != null) + offset += lastToken.endOffset() + 1; + + } finally { + stream.close(); + } + } + + fieldLengths[fieldNumber] = length; // save field length + fieldPositions[fieldNumber] = position; // save field position + fieldBoosts[fieldNumber] *= field.getBoost(); + fieldOffsets[fieldNumber] = offset; + } + } + } + + private final void addPosition(String text, int position, int offsetStart, int offsetEnd) { + final int hashCode = currentFieldHashCode + text.hashCode(); + int hashPos = hashCode % postingHashSize; + if (hashPos < 0) hashPos += postingHashSize; + + Posting p = postingHash[hashPos]; + while(p != null && (p.hashCode != hashCode || p.fieldInfo != currentField || !p.text.equals(text))) + p = p.next; + + if (p != null) { // word seen before + final int freq = p.freq; + if (p.positions.length == freq) { // positions array is full + p.positions = swapUp(p.positions); + if (p.offsetsStart != null) { + p.offsetsStart = swapUp(p.offsetsStart); + p.offsetsEnd = swapUp(p.offsetsEnd); + } + if (p.payloads != null) { + // the current field stores payloads + Payload[] newPayloads = new Payload[freq * 2]; // grow payloads array + System.arraycopy(p.payloads, 0, newPayloads, 0, p.payloads.length); + p.payloads = newPayloads; + } + } + p.positions[freq] = position; // add new position + + if (offsetStart != -1) { + if (p.offsetsStart == null) { + p.offsetsStart = newIntArray(p.positions.length); + p.offsetsEnd = newIntArray(p.positions.length); + } + p.offsetsStart[freq] = offsetStart; + p.offsetsEnd[freq] = offsetEnd; + } + + if (payload != null) { + if (p.payloads == null) + // lazily allocate payload array + p.payloads = new Payload[p.positions.length]; + p.payloads[freq] = payload; + currentField.storePayloads = true; + } + + p.freq = freq + 1; // update frequency + + } else { // word not seen before + + if (numPostings == postingArrayLimit) { + // Resize postings array + int newSize = (int) postingArrayLimit*2; + Posting[] newPostings = new Posting[newSize]; + System.arraycopy(postingArray, 0, newPostings, 0, postingArrayLimit); + for(int i=postingArrayLimit;i 256 || freeList[newSize] == null) { + // alloc a new array + newArray = new int[newSize]; + cell = null; + // if (newSize <= 256) + // System.out.println("I: " + newSize); + } else { + // reuse existing array + cell = freeList[newSize]; + freeList[newSize] = cell.next; + newArray = cell.array; + } + + // Optimize copy for small arrays + switch(oldSize) { + case 8: + newArray[7] = array[7]; + newArray[6] = array[6]; + newArray[5] = array[5]; + newArray[4] = array[4]; + case 4: + newArray[3] = array[3]; + newArray[2] = array[2]; + case 2: + newArray[1] = array[1]; + case 1: + newArray[0] = array[0]; + break; + default: + System.arraycopy(array, 0, newArray, 0, oldSize); + } + + if (oldSize <= 256) { + // save for reuse later + if (cell == null) { + if (freeCells != null) { + cell = freeCells; + freeCells = cell.next; + } else { + cell = new AllocCell(); + } + } + cell.array = array; + cell.next = freeList[oldSize]; + freeList[oldSize] = cell; + } + + return newArray; + } + + // Returns a length 1 int array + public int[] newIntArray() { + int[] r; + if (freeList[1] != null) { + AllocCell cell = freeList[1]; + r = cell.array; + freeList[1] = cell.next; + cell.next = freeCells; + freeCells = cell; + } else { + r = new int[1]; + } + return r; + } + + public int[] newIntArray(int size) { + int[] r; + if (size <= 256) { + if (freeList[size] != null) { + AllocCell cell = freeList[size]; + r = cell.array; + freeList[size] = cell.next; + cell.next = freeCells; + freeCells = cell; + } else { + r = new int[size]; + } + } else { + r = new int[size]; + System.out.println("I " + size); + } + return r; + } + + // Free this array, recycling if possible + public void recycle(int[] array) { + if (array.length <= 256) { + AllocCell cell; + if (freeCells != null) { + cell = freeCells; + freeCells = cell.next; + } else { + cell = new AllocCell(); + } + cell.array = array; + cell.next = freeList[array.length]; + freeList[array.length] = cell; + } + } + } + + private ThreadState getThreadState() { + ThreadState state = null; + while(true) { + final int size = freeThreadStates.size(); + if (0 == size) { + + // There are no free thread states + if (numWaiting >= MAX_WAIT_QUEUE) { + + // There are too many thread states in line write + // to the index so we now pause to give them a + // chance to get scheduled by the JVM and finish + // their documents. Once we wake up again, a + // recycled ThreadState should be available else + // we wait again. + try { + wait(); + } catch (InterruptedException e) { + } + + } else { + // OK, just create a new thread state + state = new ThreadState(); + break; + } + } else { + // Use recycled thread state + state = (ThreadState) freeThreadStates.get(size-1); + freeThreadStates.remove(size-1); + break; + } + } + return state; + } + + void addDocument(Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + + // First pass: go through all fields in doc, updating + // shared FieldInfos and writing any stored fields: + final ThreadState state; + + // nocommit: need try/finally to free up thread state? + + synchronized(this) { + // Allocate a thread state. In the single threaded + // case there will always be exactly one thread + // state. Else, multiple thread states can be "in + // flight" but get recycled/shared here. + state = getThreadState(); + + // Do synchronized initialization. + state.init(doc); + } + + state.invertDocument(doc, analyzer); + + state.sortPostingTable(); + + state.writeStoredFields(); + + // At this point we must commit to a docID for this + // document because we write the docID when building the + // posting lists next + synchronized(this) { + // Must be synchronized here because the assign & ++ are not atomic: + state.docID = docID++; + } + + state.buildPostingsAndVectors(); + + // Now write the indexed document to the real files. + synchronized(this) { + + if (nextWriteDocID == state.docID) { + // It's my turn, so write everything now: + nextWriteDocID++; + flush(state); + + // If any states are waiting, sweep through and + // flush those that are enabled by my write. + if (numWaiting > 0) { + boolean doNotify = numWaiting >= MAX_WAIT_QUEUE; + boolean any = false; + while(true) { + int upto = 0; + for(int i=0;i 0) { + tvx.writeLong(tvd.getFilePointer()); + tvd.writeVInt(state.numVectorFields); + for(int i=0;i ramBufferSize/12) { + mergeRAMSegments(state, 0); + if (levelSizes[1] > ramBufferSize/4) + mergeRAMSegments(state, 1); + } + + if (doSelfFlush && totalSize > ramBufferSize) + flushRAMSegments(state); + } + + long getRAMUsed() { + return totalSize; + } + + private final TermInfo termInfo = new TermInfo(); // minimize consing + private IndexOutput freqOutput; + private IndexOutput proxOutput; + private int skipInterval; + private int lastDoc; + private int lastPayloadLength; + private int df; + private boolean currentFieldStorePayloads; + + // Write out the postings & dictionary to real output + // files, in the "real" lucene file format. This is to + // finalize a segment. + final void flushTerms() throws IOException { + + if (infoStream != null) + infoStream.println("flush postings as segment " + segment + " docID=" + MultiDocumentWriter.this.docID); + + TermInfosWriter termInfosWriter = null; + + final int numRAMSegments = ramSegments.size(); + final int numFlushedSegments = flushedSegments.size(); + final int numSegmentsIn = numRAMSegments + numFlushedSegments; + resizeMergeInputs(numSegmentsIn); + int numDoc = 0; + + try { + freqOutput = directory.createOutput(segment + ".frq"); + proxOutput = directory.createOutput(segment + ".prx"); + termInfosWriter = new TermInfosWriter(directory, segment, fieldInfos, + writer.getTermIndexInterval()); + skipInterval = termInfosWriter.skipInterval; + + RAMSegmentMergeQueue queue = null; + + queue = new RAMSegmentMergeQueue(numSegmentsIn); + int i=0; + for (;i>> 1; + + assert doc > lastDoc || df == 1; + + final int termDocFreq; + final int newDocCode = (doc-lastDoc)<<1; + lastDoc = doc; + + if ((docCode & 1) != 0) { + freqOutput.writeVInt(newDocCode|1); + termDocFreq = 1; + //System.out.println(" doc " + doc + " freq 1"); + //System.out.println(" write " + (newDocCode|1)); + } else { + freqOutput.writeVInt(newDocCode); + termDocFreq = freq.readVInt(); + //System.out.println(" doc " + doc + " freq " + termDocFreq); + //System.out.println(" write " + newDocCode + " then " + termDocFreq); + + freqOutput.writeVInt(termDocFreq); + } + + /** See {@link DocumentWriter#writePostings(Posting[], String) for + * documentation about the encoding of positions and payloads + */ + for(int j=0;j 0) + copyBytes(prox, proxOutput, payloadLength); + } else { + assert 0 == (deltaCode&1); + proxOutput.writeVInt(deltaCode>>1); + } + } + } + } + + private RAMFile skipBuffer = new RAMFile(); + private int lastSkipDoc; + private int lastSkipPayloadLength; + private long lastSkipFreqPointer; + private long lastSkipProxPointer; + + private void resetSkip() { + lastSkipDoc = 0; + lastSkipPayloadLength = -1; // we don't have to write the first length in the skip list + lastSkipFreqPointer = freqOutput.getFilePointer(); + lastSkipProxPointer = proxOutput.getFilePointer(); + } + + private void bufferSkip(int doc, int payloadLength) throws IOException { + //System.out.println(" buffer skip: freq ptr " + freqPointer + " prox " + proxPointer); + //System.out.println(" vs last freq ptr " + lastSkipFreqPointer + " prox " + lastSkipProxPointer); + + // To efficiently store payloads in the posting lists we do not store the length of + // every payload. Instead we omit the length for a payload if the previous payload had + // the same length. + // However, in order to support skipping the payload length at every skip point must be known. + // So we use the same length encoding that we use for the posting lists for the skip data as well: + // Case 1: current field does not store payloads + // SkipDatum --> DocSkip, FreqSkip, ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // DocSkip records the document number before every SkipInterval th document in TermFreqs. + // Document numbers are represented as differences from the previous value in the sequence. + // Case 2: current field stores payloads + // SkipDatum --> DocSkip, PayloadLength?, FreqSkip,ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // PayloadLength --> VInt + // In this case DocSkip/2 is the difference between + // the current and the previous value. If DocSkip + // is odd, then a PayloadLength encoded as VInt follows, + // if DocSkip is even, then it is assumed that the + // current payload length equals the length at the previous + // skip point + + final int delta = doc - lastSkipDoc; + if (currentFieldStorePayloads) { + if (payloadLength == lastSkipPayloadLength) + // the current payload length equals the length at the previous skip point, + // so we don't store the length again + skipBuffer.writeVInt(delta << 1); + else { + // the payload length is different from the previous one. We shift the DocSkip, + // set the lowest bit and store the current payload length as VInt. + skipBuffer.writeVInt((delta << 1) + 1); + skipBuffer.writeVInt(payloadLength); + lastSkipPayloadLength = payloadLength; + } + } else + // current field does not store payloads + skipBuffer.writeVInt(delta); + + long freqPointer = freqOutput.getFilePointer(); + long proxPointer = proxOutput.getFilePointer(); + skipBuffer.writeVInt((int) (freqPointer - lastSkipFreqPointer)); + skipBuffer.writeVInt((int) (proxPointer - lastSkipProxPointer)); + lastSkipFreqPointer = freqPointer; + lastSkipProxPointer = proxPointer; + + lastSkipDoc = doc; + } + + private long writeSkip() throws IOException { + long skipPointer = freqOutput.getFilePointer(); + skipBuffer.writeTo(freqOutput); + return skipPointer; + } + + // Called when RAM buffer is full; we now merge all RAM + // segments to a single flushed segment: + final synchronized void flushRAMSegments(ThreadState state) throws IOException { + + if (infoStream != null) { + String name = tempFileName(".tis", flushedCount); + infoStream.println("\n" + getElapsedTime() + ": flush ram segments at docID " + docID + ", to " + name.substring(0, name.length()-4) + ": totalRam=" + (totalSize/1024/1024) + " MB"); + } + + IndexOutput termsOut = directory.createOutput(tempFileName(".tis", flushedCount)); + IndexOutput freqOut = directory.createOutput(tempFileName(".frq", flushedCount)); + IndexOutput proxOut = directory.createOutput(tempFileName(".prx", flushedCount)); + + final int numSegmentsIn = ramSegments.size(); + long newSize; + long oldSize = totalSize; + + resizeMergeInputs(numSegmentsIn); + + int numDoc = 0; + for(int i=0;i start; i--) // remove old infos & add new + flushedSegments.remove(i); + + FlushedSegment newFlushedSegment = new FlushedSegment(numDoc, flushedCount++); + flushedSegments.set(start, newFlushedSegment); + + if (flushedLevelSizes.length == level+1) { + flushedLevelSizes = realloc(flushedLevelSizes, 1+flushedLevelSizes.length); + flushedLevelCounts = realloc(flushedLevelCounts, 1+flushedLevelCounts.length); + } + + flushedLevelSizes[level] -= oldSize; + flushedLevelSizes[1+level] += newSize; + + flushedLevelCounts[level] -= (end-start); + flushedLevelCounts[1+level]++; + + totalFlushedSize += newSize - oldSize; + + if (infoStream != null) + infoStream.println("merge flushed segments done: oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalFlushed=" + (totalFlushedSize/1024/1024) + " MB"); + + files = null; + + // Have deleter remove our now unreferenced files: + writer.deleter.checkpoint(writer.segmentInfos, false); + } + + private static void close(IndexOutput f0, IndexOutput f1, IndexOutput f2) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (f2 != null) f2.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + private static void close(IndexInput f0, IndexInput f1, IndexInput f2) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (f2 != null) f2.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + private static void close(IndexOutput freq, IndexOutput prox, TermInfosWriter terms) throws IOException { + IOException keep = null; + try { + if (freq != null) freq.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (prox != null) prox.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (terms != null) terms.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + NumberFormat nf = NumberFormat.getInstance(); + String getElapsedTime() { + long t = System.currentTimeMillis(); + nf.setMaximumFractionDigits(1); + nf.setMinimumFractionDigits(1); + return nf.format((t-startTime)/1000.0) + " sec"; + } + + // In-memory merge: reads multiple ram segments (in the + // modified format) and replaces with a single ram segment. + final void mergeRAMSegments(ThreadState state, int level) throws IOException { + + RAMFile termsOut = state.getRAMFile(); + RAMFile freqOut = state.getRAMFile(); + RAMFile proxOut = state.getRAMFile(); + + int start = 0; + int end = 0; + for(int i=levelCounts.length-1;i>=level;i--) { + start = end; + end += levelCounts[i]; + } + + if (infoStream != null) + infoStream.println("\n" + getElapsedTime() + ": merge ram segments: level " + level + ": start idx " + start + " to end idx " + end + " docID=" + docID); + + long oldSize; + long newSize; + int numDoc; + RAMSegment newRAMSegment; + + if (end == start+1) { + // Degenerate case, if suddenly an immense document + // comes through + newRAMSegment = (RAMSegment) ramSegments.get(start); + newSize = oldSize = newRAMSegment.terms.size + newRAMSegment.freq.size + newRAMSegment.prox.size; + numDoc = newRAMSegment.numDoc; + } else { + + resizeMergeInputs(end-start); + final int numSegmentsIn = end-start; + + oldSize = 0; + int upto = 0; + numDoc = 0; + for(int i=start;i ram case: + RAMFileReader src = (RAMFileReader) srcIn; + RAMFile dest = (RAMFile) destIn; + + if (dest.buffer == null) + dest.nextBuffer(); + + while(numBytes > 0) { + final int readChunk = src.limit - src.upto; + final int writeChunk = dest.limit - dest.upto; + int chunk; + int mode; + + if (readChunk < writeChunk) { + // src is the limit + mode = 0; + chunk = readChunk; + } else if (readChunk > writeChunk) { + // dest is the liimt + mode = 1; + chunk = writeChunk; + } else { + // both matched + mode = 2; + chunk = writeChunk; + } + + if (numBytes < chunk) { + // size is the limit + mode = 3; + chunk = (int) numBytes; + } + + System.arraycopy(src.buffer, src.upto, dest.buffer, dest.upto, chunk); + numBytes -= chunk; + + switch(mode) { + case 0: + // src is the limit + src.nextBuffer(); + dest.upto += chunk; + break; + case 1: + // dest is the limit + dest.nextBuffer(); + src.upto += chunk; + break; + case 2: + // src and dest matched as limit + src.nextBuffer(); + dest.nextBuffer(); + break; + case 3: + // size is the limit + src.upto += chunk; + dest.upto += chunk; + break; + } + } + } else if (srcIn instanceof RAMFileReader) { + RAMFileReader src = (RAMFileReader) srcIn; + while(numBytes > 0) { + final int chunk = src.limit - src.upto; + if (chunk <= numBytes) { + // Src is the limit + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + numBytes -= chunk; + } else { + // numBytes is the limit + destIn.writeBytes(src.buffer, src.upto, (int) numBytes); + src.upto += numBytes; + break; + } + } + } else { + while(numBytes > 0) { + final int chunk; + if (numBytes > 1024) { + chunk = 1024; + } else { + chunk = (int) numBytes; + } + srcIn.readBytes(byteBuffer, 0, chunk); + destIn.writeBytes(byteBuffer, chunk); + numBytes -= chunk; + } + } + } + + /** If non-null, a message will be printed to this if maxFieldLength is reached. + */ + void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + // nocommit + // this.infoStream = System.out; + } + + final static class Posting { // info about a Term in a doc + Posting next; + FieldInfo fieldInfo; + String text; + int freq; // its frequency in doc + int[] positions; // positions it occurs at + Payload[] payloads; + int[] offsetsStart; + int[] offsetsEnd; + int hashCode; + + public int compareTo(Object o) { + Posting other = (Posting) o; + if (fieldInfo == other.fieldInfo) + return text.compareTo(other.text); + else + return fieldInfo.name.compareTo(other.fieldInfo.name); + } + } + + private class RAMSegment { + int numDoc; + RAMFile terms; + RAMFile freq; + RAMFile prox; + public RAMSegment(int numDoc, RAMFile terms, RAMFile freq, RAMFile prox) { + this.numDoc = numDoc; + this.terms = terms; + this.freq = freq; + this.prox = prox; + } + } + + private class FlushedSegment { + int numDoc; + int segment; + public FlushedSegment(int numDoc, int segment) { + this.numDoc = numDoc; + this.segment = segment; + } + } + + final class SegmentMergeInfo { + int idx; + + char textBuffer[] = new char[10]; + int textLength; + int fieldNumber; + + IndexInput terms; + IndexInput freq; + IndexInput prox; + + IndexOutput freqOut; + IndexOutput proxOut; + + private long freqSize; + private long proxSize; + + long size; + long pos; + int df; + + SegmentMergeInfo(int idx) { + this.idx = idx; + } + + public void setInputs(IndexInput terms, IndexInput freq, IndexInput prox) { + this.terms = terms; + this.freq = freq; + this.prox = prox; + } + public void setOutputs(IndexOutput freqOut, IndexOutput proxOut) { + this.freqOut = freqOut; + this.proxOut = proxOut; + } + + boolean next() throws IOException { + final int start = terms.readVInt(); + if (start == Integer.MAX_VALUE) { + return false; + } + + final int length = terms.readVInt(); + textLength = start + length; + if (textLength > textBuffer.length) { + char[] newTextBuffer = new char[(int) (textLength*1.5)]; + //System.out.println("start=" + start + " length=" + length + " = textLength " + textLength + " vs " + newTextBuffer.length); + // System.out.println("here: " + terms); + System.arraycopy(textBuffer, 0, newTextBuffer, 0, start); + textBuffer = newTextBuffer; + } + terms.readChars(textBuffer, start, length); + fieldNumber = terms.readVInt(); + df = terms.readVInt(); + freqSize = terms.readVLong(); + proxSize = terms.readVLong(); + return true; + } + + private void flush() throws IOException { + copyBytes(freq, freqOut, freqSize); + copyBytes(prox, proxOut, proxSize); + } + + public void close() throws IOException { + MultiDocumentWriter.close(terms, freq, prox); + } + + protected final boolean equalTerm(int otherFieldNumber, char[] otherTextBuffer, int otherTextLength) { + if (otherFieldNumber == fieldNumber) { + final char[] textA = textBuffer; + final char[] textB = otherTextBuffer; + if (textLength != otherTextLength) { + return false; + } + for(int i=0;i charB) { + return false; + } + } + + if (stiA.textLength < stiB.textLength) { + return true; + } else if (stiA.textLength > stiB.textLength) { + return false; + } + + // finally by index + return stiA.idx < stiB.idx; + + } else { + // fields differ: + String fieldA = fieldInfos.fieldName(stiA.fieldNumber); + String fieldB = fieldInfos.fieldName(stiB.fieldNumber); + return fieldA.compareTo(fieldB) < 0; + } + } + } + + private static final class RAMCell { + final static int MAX_LEVEL = 2; + static RAMCell freeCells[] = new RAMCell[1+RAMCell.MAX_LEVEL]; + + static int numBufferOut; + static int numByteOut; + + byte[] buffer; + RAMCell next; + byte level; + + static void recycle(RAMCell cell) { + cell.next = freeCells[cell.level]; + freeCells[cell.level] = cell; + + // ASSERT + numBufferOut--; + numByteOut -= cell.buffer.length; + // System.out.println("F level " + cell.level + " " + cell.buffer + " bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut); + } + + public static RAMCell alloc(final int level) { + RAMCell r = freeCells[level]; + if (r != null) { + // reuse + freeCells[level] = r.next; + r.next = null; + // System.out.println("R " + level + ": " + r.buffer + " bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut); + } else { + r = new RAMCell(level); + //System.out.println(" new alloc " + level + ": " + r); + } + // ASSERT + numBufferOut++; + numByteOut += r.buffer.length; + return r; + } + + public RAMCell(final int level) { + this.level = (byte) level; + int size = 0; + switch(this.level) { + case 0: + size = 128; + break; + case 1: + size = 1024; + break; + case 2: + size = 8192; + break; + } + buffer = new byte[size]; + // System.out.println("B " + level + " " + buffer + ": " + freeCells[level] + " bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut); + } + } + + private static final class RAMFile extends IndexOutput { + + RAMCell head; + RAMCell tail; + RAMFileReader r; + RAMFile next; + int upto; + int limit; + byte[] buffer; + int size; + + boolean isFree = false; + + // reset ourself, transfer all buffers to a new reader, and return that reader + public IndexInput getReader() { + // System.out.println(" GET READER: " + this + ": " + length()); + if (head == null) + return null; + else if (r == null) + r = new RAMFileReader(this); + else + r.reset(this); + reset(); + return r; + } + + // Move all of our bytes to out and reset + public void writeTo(IndexOutput out) throws IOException { + assert !isFree; + while(head != null) { + final int numBytes; + if (head.next == null) + numBytes = upto; + else + numBytes = head.buffer.length; + out.writeBytes(head.buffer, numBytes); + RAMCell next = head.next; + RAMCell.recycle(head); + head = next; + } + reset(); + } + + private void reset() { + assert !isFree; + head = tail = null; + buffer = null; + size = limit = upto = 0; + } + + private void free() { + assert !isFree; + while(head != null) { + RAMCell c = head.next; + RAMCell.recycle(head); + head = c; + } + reset(); + } + + public void writeByte(byte b) { + assert !isFree; + if (upto == limit) + nextBuffer(); + buffer[upto++] = b; + } + + public void writeBytes(byte[] b, int offset, int numBytes) { + assert !isFree; + while(numBytes > 0) { + int chunk = limit - upto; + if (chunk > numBytes) { + // Buffer is the limit + System.arraycopy(b, offset, buffer, upto, numBytes); + upto += numBytes; + break; + } else { + // We are the limit + System.arraycopy(b, offset, buffer, upto, chunk); + offset += chunk; + numBytes -= chunk; + nextBuffer(); + } + } + } + + public void nextBuffer() { + assert !isFree; + + final int level; + if (tail == null) + level = 0; + else if (tail.level < RAMCell.MAX_LEVEL) + level = 1+tail.level; + else + level = RAMCell.MAX_LEVEL; + + RAMCell c = RAMCell.alloc(level); + + if (head == null) + head = tail = c; + else { + tail.next = c; + tail = c; + } + + limit = c.buffer.length; + size += limit; + buffer = c.buffer; + + upto = 0; + } + + public long getFilePointer() { + assert !isFree; + return size - (limit-upto); + } + + public long length() { + assert !isFree; + return getFilePointer(); + } + + public void close() {} + + public void flush() {throw new RuntimeException("not implemented");} + public void seek(long pos) {throw new RuntimeException("not implemented");} + } + + // Limited IndexInput for "read once". This frees each + // buffer from the head once it's been read. It can only + // be created off an already written RAMFile. + private static final class RAMFileReader extends IndexInput { + + int readLimit; + int upto; + int limit; + RAMCell head; + byte[] buffer; + + // ASSERT + boolean finished = true; + + RAMFileReader(RAMFile ramFile) { + reset(ramFile); + } + + public void reset(RAMFile ramFile) { + // Make sure we were fully read + assert finished; + finished = false; + readLimit = ramFile.upto; + head = ramFile.head; + buffer = head.buffer; + if (head.next == null) + limit = readLimit; + else + limit = buffer.length; + } + + public byte readByte() { + byte b = buffer[upto++]; + if (upto == limit) + nextBuffer(); + return b; + } + + public void nextBuffer() { + RAMCell c = head.next; + RAMCell.recycle(head); + head = c; + upto = 0; + if (head != null) { + buffer = head.buffer; + if (head.next == null) + limit = readLimit; + else + limit = buffer.length; + } else { + // ASSERT + finished = true; + buffer = null; + } + } + + public void readBytes(byte[] b, int offset, int len) {} + public void close() {} + public long getFilePointer() {return 0;} + public void seek(long pos) {} + public long length() {return 0;} + } + + static final byte defaultNorm = Similarity.encodeNorm(1.0f); + + private static class BufferedNorms { + + RAMFile out = new RAMFile(); + int upto; + + void add(float norm) { + byte b = Similarity.encodeNorm(norm); + out.writeByte(b); + upto++; + } + + void fill(int docID) { + // System.out.println(" now fill: " + upto + " vs " + docID); + while(upto < docID) { + // fill in docs that didn't have this field: + out.writeByte(defaultNorm); + upto++; + } + } + } + + static long[] realloc(long[] array, int newSize) { + long[] newArray = new long[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static int[] realloc(int[] array, int newSize) { + int[] newArray = new int[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static float[] realloc(float[] array, int newSize) { + float[] newArray = new float[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } +} Property changes on: src/java/org/apache/lucene/index/MultiDocumentWriter.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/SegmentMergeInfo.java =================================================================== --- src/java/org/apache/lucene/index/SegmentMergeInfo.java (revision 521173) +++ src/java/org/apache/lucene/index/SegmentMergeInfo.java (working copy) @@ -73,9 +73,8 @@ final void close() throws IOException { termEnum.close(); - if (postings != null) { - postings.close(); + if (postings != null) + postings.close(); } } -} Index: src/java/org/apache/lucene/store/IndexOutput.java =================================================================== --- src/java/org/apache/lucene/store/IndexOutput.java (revision 521173) +++ src/java/org/apache/lucene/store/IndexOutput.java (working copy) @@ -125,6 +125,24 @@ } } + public void writeChars(char[] s, int start, int length) + throws IOException { + final int end = start + length; + for (int i = start; i < end; i++) { + final int code = (int)s[i]; + if (code >= 0x01 && code <= 0x7F) + writeByte((byte)code); + else if (((code >= 0x80) && (code <= 0x7FF)) || code == 0) { + writeByte((byte)(0xC0 | (code >> 6))); + writeByte((byte)(0x80 | (code & 0x3F))); + } else { + writeByte((byte)(0xE0 | (code >>> 12))); + writeByte((byte)(0x80 | ((code >> 6) & 0x3F))); + writeByte((byte)(0x80 | (code & 0x3F))); + } + } + } + /** Forces any buffered output to be written. */ public abstract void flush() throws IOException; Index: src/demo/org/apache/lucene/demo/IndexLineFiles.java =================================================================== --- src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) +++ src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) @@ -0,0 +1,127 @@ +package org.apache.lucene.demo; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.document.DateTools; + +import java.io.File; +import java.io.FileReader; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Date; + +// nocommit +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; + +/** Index all text files under a directory. */ +public class IndexLineFiles { + + private IndexLineFiles() {} + + static final File INDEX_DIR = new File("index"); + + static int bufferSize; + + /** Index all text files under a directory. */ + public static void main(String[] args) throws IOException { + String usage = "java org.apache.lucene.demo.IndexFiles "; + + if (args.length == 0) { + System.err.println("Usage: " + usage); + System.exit(1); + } + + boolean autoCommit = args[1].equals("yes"); + bufferSize = Integer.parseInt(args[2]); + int numDoc = Integer.parseInt(args[3]); + int maxBufferedDocs = Integer.parseInt(args[4]); + boolean optimize = args[5].equals("yes"); + + System.out.println("\nFAST: autoCommit=" + autoCommit + " bufferSize=" + bufferSize + "MB docLimit=" + numDoc + " optimize=" + optimize); + + if (INDEX_DIR.exists()) { + System.out.println("Cannot save index to '" +INDEX_DIR+ "' directory, please delete it first"); + System.exit(1); + } + + BufferedReader input = new BufferedReader(new FileReader(args[0])); + String line = null; + + Date start = new Date(); + try { + // IndexWriter writer = new IndexWriter(INDEX_DIR, new StandardAnalyzer(), true); + IndexWriter writer = new IndexWriter(FSDirectory.getDirectory(INDEX_DIR), autoCommit, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(maxBufferedDocs); + writer.setMaxFieldLength(100000); + writer.setRAMBufferSizeMB(bufferSize); + writer.setUseCompoundFile(false); + // writer.setMaxFieldLength(10000000); + //writer.setMaxFieldLength(1000); + + int count = 0; + while (( line = input.readLine()) != null) { + Document doc = new Document(); + + // Add the path of the file as a field named "path". Use a field that is + // indexed (i.e. searchable), but don't tokenize the field into words. + doc.add(new Field("path", args[0], Field.Store.YES, Field.Index.UN_TOKENIZED)); + + // Add the last modified date of the file a field named "modified". Use + // a field that is indexed (i.e. searchable), but don't tokenize the field + // into words. + doc.add(new Field("modified", + "200703161637", + Field.Store.YES, Field.Index.UN_TOKENIZED)); + + doc.add(new Field("contents", line, Field.Store.NO, Field.Index.TOKENIZED, Field.TermVector.WITH_OFFSETS)); + writer.addDocument(doc); + netMem += bean.getHeapMemoryUsage().getUsed(); + if (++count == numDoc) + break; + } + input.close(); + if (optimize) { + System.out.println("Optimize..."); + writer.optimize(); + } + writer.close(); + + Date end = new Date(); + System.out.println(count + " docs; " + (end.getTime() - start.getTime()) + " total milliseconds"); + System.out.println("avg mem: " + (netMem/count/1024/1024) + " MB"); + + } catch (IOException e) { + e.printStackTrace(System.out); + System.out.println(" caught a " + e.getClass() + + "\n with message: " + e.getMessage()); + } + } + + static MemoryMXBean bean = ManagementFactory.getMemoryMXBean(); + static long netMem; + static int netCount; +} Property changes on: src/demo/org/apache/lucene/demo/IndexLineFiles.java ___________________________________________________________________ Name: svn:eol-style + native