Index: src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (revision 522266) +++ src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (working copy) @@ -40,7 +40,7 @@ for (int i = 0; i < 100; i++) { addDoc(writer); checkInvariants(writer); - if (writer.getRamSegmentCount() + writer.getSegmentCount() >= 18) { + if (writer.getSegmentCount() + writer.getSegmentCount() >= 18) { noOverMerge = true; } } @@ -178,7 +178,7 @@ int mergeFactor = writer.getMergeFactor(); int maxMergeDocs = writer.getMaxMergeDocs(); - int ramSegmentCount = writer.getRamSegmentCount(); + int ramSegmentCount = writer.getNumBufferedDocuments(); assertTrue(ramSegmentCount < maxBufferedDocs); int lowerBound = -1; Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 522266) +++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -93,7 +93,7 @@ } modifier.flush(); - assertEquals(0, modifier.getRamSegmentCount()); + assertEquals(0, modifier.getNumBufferedDocuments()); assertTrue(0 < modifier.getSegmentCount()); if (!autoCommit) { @@ -435,7 +435,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexReader.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexReader.java (revision 522266) +++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy) @@ -803,7 +803,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 522266) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -461,7 +461,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null); + IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); @@ -842,6 +842,7 @@ public void testCommitOnCloseAbort() throws IOException { Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for (int i = 0; i < 14; i++) { addDoc(writer); } @@ -854,6 +855,7 @@ searcher.close(); writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDoc(writer); } @@ -878,6 +880,7 @@ // Now make sure we can re-open the index, add docs, // and all is good: writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false); + writer.setMaxBufferedDocs(10); for(int i=0;i<12;i++) { for(int j=0;j<17;j++) { addDoc(writer); @@ -945,6 +948,7 @@ public void testCommitOnCloseOptimize() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); for(int j=0;j<17;j++) { addDocWithIndex(writer, j); } Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 522266) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -74,8 +74,6 @@ count++; } - modifier.close(); - } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); @@ -125,6 +123,9 @@ IndexerThread indexerThread = new IndexerThread(modifier); indexerThread.start(); + IndexerThread indexerThread2 = new IndexerThread(modifier); + indexerThread2.start(); + // Two searchers that constantly just re-instantiate the searcher: SearcherThread searcherThread1 = new SearcherThread(directory); searcherThread1.start(); @@ -133,9 +134,14 @@ searcherThread2.start(); indexerThread.join(); + indexerThread2.join(); searcherThread1.join(); searcherThread2.join(); + + modifier.close(); + assertTrue("hit unexpected exception in indexer", !indexerThread.failed); + assertTrue("hit unexpected exception in indexer 2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); //System.out.println(" Writer: " + indexerThread.count + " iterations"); Index: src/test/org/apache/lucene/index/TestIndexFileDeleter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexFileDeleter.java (revision 522266) +++ src/test/org/apache/lucene/index/TestIndexFileDeleter.java (working copy) @@ -34,6 +34,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); int i; for(i=0;i<35;i++) { addDoc(writer, i); Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java =================================================================== --- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 522266) +++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy) @@ -254,6 +254,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -271,7 +272,7 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } // Simplistic check: just verify all segments_N's still @@ -316,6 +317,7 @@ Directory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy); + writer.setMaxBufferedDocs(10); writer.setUseCompoundFile(useCompoundFile); for(int i=0;i<107;i++) { addDoc(writer); @@ -333,13 +335,15 @@ } else { // If we are not auto committing then there should // be exactly 2 commits (one per close above): - assertEquals(2, policy.numOnCommit); + assertEquals(autoCommit?2:1, policy.numOnCommit); } - // Simplistic check: just verify the index is in fact - // readable: - IndexReader reader = IndexReader.open(dir); - reader.close(); + if (autoCommit) { + // Simplistic check: just verify the index is in fact + // readable: + IndexReader reader = IndexReader.open(dir); + reader.close(); + } dir.close(); } @@ -363,6 +367,7 @@ for(int j=0;j>> 1; // shift off low bit if ((docCode & 1) != 0) // if low bit is set freq = 1; // freq is one else freq = freqStream.readVInt(); // else read freq count++; + // //System.out.println(" read freq " + freq); if (deletedDocs == null || !deletedDocs.get(doc)) { + //System.out.println(" add " + doc + "; freq=" + freq); docs[i] = doc; freqs[i] = freq; ++i; @@ -156,7 +166,9 @@ /** Optimized implementation. */ public boolean skipTo(int target) throws IOException { + //System.out.println("std skip to " + target); if (df >= skipInterval) { // optimized case + //System.out.println(" is frequent enough"); if (skipStream == null) skipStream = (IndexInput) freqStream.clone(); // lazily clone @@ -172,6 +184,7 @@ long lastFreqPointer = freqStream.getFilePointer(); long lastProxPointer = -1; int numSkipped = -1 - (count % skipInterval); + //System.out.println(" target " + target + "; skipDoc " + skipDoc); while (target > skipDoc) { lastSkipDoc = skipDoc; @@ -203,11 +216,13 @@ freqPointer += skipStream.readVInt(); proxPointer += skipStream.readVInt(); + //System.out.println(" now freq " + freqPointer + " prox " + proxPointer); skipCount++; } // if we found something to skip, then skip it if (lastFreqPointer > freqStream.getFilePointer()) { + //System.out.println(" do skip! " + lastFreqPointer); freqStream.seek(lastFreqPointer); skipProx(lastProxPointer, lastPayloadLength); @@ -219,6 +234,7 @@ // done skipping, now just scan do { + //System.out.println(" now scan " + target + " " + doc); if (!next()) return false; } while (target > doc); Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 522266) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -171,11 +171,16 @@ public final static int DEFAULT_MERGE_FACTOR = 10; /** - * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}. + * Default value is 0. Change using {@link #setMaxBufferedDocs(int)}. */ - public final static int DEFAULT_MAX_BUFFERED_DOCS = 10; + public final static int DEFAULT_MAX_BUFFERED_DOCS = 0; /** + * Default value is 16 MB. Change using {@link #setRAMBufferSizeMB}. + */ + public final static float DEFAULT_RAM_BUFFER_SIZE_MB = 16F; + + /** * Default value is 1000. Change using {@link #setMaxBufferedDeleteTerms(int)}. */ public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; @@ -208,8 +213,7 @@ private boolean autoCommit = true; // false if we should commit only on close SegmentInfos segmentInfos = new SegmentInfos(); // the segments - SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory - private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs + private MultiDocumentWriter docWriter; private IndexFileDeleter deleter; private Lock writeLock; @@ -563,6 +567,10 @@ } } + IndexFileDeleter getDeleter() { + return deleter; + } + private void init(Directory d, Analyzer a, final boolean create, boolean closeDir, IndexDeletionPolicy deletionPolicy, boolean autoCommit) throws CorruptIndexException, LockObtainFailedException, IOException { this.closeDir = closeDir; @@ -602,11 +610,14 @@ rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); } + docWriter = new MultiDocumentWriter(newSegmentName(), directory, this, !autoCommit); + docWriter.setInfoStream(infoStream); + // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: deleter = new IndexFileDeleter(directory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, - segmentInfos, infoStream); + segmentInfos, infoStream, docWriter); } catch (IOException e) { this.writeLock.release(); @@ -672,19 +683,34 @@ */ public void setMaxBufferedDocs(int maxBufferedDocs) { ensureOpen(); - if (maxBufferedDocs < 2) - throw new IllegalArgumentException("maxBufferedDocs must at least be 2"); + if (maxBufferedDocs != 0 && maxBufferedDocs < 2) + throw new IllegalArgumentException("maxBufferedDocs must at least be 2 or 0 to disable"); this.minMergeDocs = maxBufferedDocs; } /** * @see #setMaxBufferedDocs + * @deprecated */ public int getMaxBufferedDocs() { ensureOpen(); return minMergeDocs; } + // nocommit javadoc + public void setRAMBufferSizeMB(float mb) { + if (mb < 1) + throw new IllegalArgumentException("ramBufferSize must at least be 1 MB"); + ramBufferSize = mb*1024F*1024F; + if (!autoCommit) + docWriter.setRAMBufferSizeMB(mb); + } + + // nocommit javadoc + public float getRAMBufferSizeMB() { + return ramBufferSize/1024F/1024F; + } + /** *

Determines the minimal number of delete terms required before the buffered * in-memory delete terms are applied and flushed. If there are documents @@ -756,7 +782,9 @@ public void setInfoStream(PrintStream infoStream) { ensureOpen(); this.infoStream = infoStream; - deleter.setInfoStream(infoStream); + docWriter.setInfoStream(infoStream); + // nocommit + //deleter.setInfoStream(infoStream); } /** @@ -835,7 +863,7 @@ */ public synchronized void close() throws CorruptIndexException, IOException { if (!closed) { - flushRamSegments(); + flush(); if (commitPending) { segmentInfos.write(directory); // now commit changes @@ -844,7 +872,6 @@ rollbackSegmentInfos = null; } - ramDirectory.close(); if (writeLock != null) { writeLock.release(); // release write lock writeLock = null; @@ -884,7 +911,7 @@ /** Returns the number of documents currently in this index. */ public synchronized int docCount() { ensureOpen(); - int count = ramSegmentInfos.size(); + int count = docWriter.docID; for (int i = 0; i < segmentInfos.size(); i++) { SegmentInfo si = segmentInfos.info(i); count += si.docCount; @@ -962,24 +989,13 @@ */ public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); synchronized (this) { - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); + // nocommit -- move this out of sync + docWriter.addDocument(doc, analyzer); + maybeFlush(); } } - SegmentInfo buildSingleDocSegment(Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - DocumentWriter dw = new DocumentWriter(ramDirectory, analyzer, this); - dw.setInfoStream(infoStream); - String segmentName = newRamSegmentName(); - dw.addDocument(segmentName, doc); - SegmentInfo si = new SegmentInfo(segmentName, 1, ramDirectory, false, false); - si.setNumFields(dw.getNumFields()); - return si; - } - /** * Deletes the document(s) containing term. * @param term the term to identify the documents to be deleted @@ -989,7 +1005,7 @@ public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); bufferDeleteTerm(term); - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1005,7 +1021,7 @@ for (int i = 0; i < terms.length; i++) { bufferDeleteTerm(terms[i]); } - maybeFlushRamSegments(); + maybeFlush(); } /** @@ -1041,26 +1057,23 @@ public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer); + // nocommit: should this be in sync? + bufferDeleteTerm(term); synchronized (this) { - bufferDeleteTerm(term); - ramSegmentInfos.addElement(newSegmentInfo); - maybeFlushRamSegments(); + // nocommit move out of sync + docWriter.addDocument(doc, analyzer); + maybeFlush(); } } - final synchronized String newRamSegmentName() { - return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX); - } - // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); } // for test purpose - final synchronized int getRamSegmentCount(){ - return ramSegmentInfos.size(); + final synchronized int getNumBufferedDocuments(){ + return docWriter.docID; } // for test purpose @@ -1089,6 +1102,7 @@ */ private int mergeFactor = DEFAULT_MERGE_FACTOR; + // nocommit fix javadocs /** Determines the minimal number of documents required before the buffered * in-memory documents are merging and a new Segment is created. * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory}, @@ -1096,10 +1110,12 @@ * the number of files open in a FSDirectory. * *

The default value is {@link #DEFAULT_MAX_BUFFERED_DOCS}. - + * @deprecated */ private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS; + // nocommit javadoc + private float ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; /** Determines the largest number of documents ever merged by addDocument(). * Small values (e.g., less than 10,000) are best for interactive indexing, @@ -1177,7 +1193,7 @@ */ public synchronized void optimize() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + flush(); while (segmentInfos.size() > 1 || (segmentInfos.size() == 1 && (SegmentReader.hasDeletions(segmentInfos.info(0)) || @@ -1186,7 +1202,7 @@ (useCompoundFile && (!SegmentReader.usesCompoundFile(segmentInfos.info(0))))))) { int minSegment = segmentInfos.size() - mergeFactor; - mergeSegments(segmentInfos, minSegment < 0 ? 0 : minSegment, segmentInfos.size()); + mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size()); } } @@ -1203,7 +1219,7 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; if (localAutoCommit) { - flushRamSegments(); + flush(); // Turn off auto-commit during our local transaction: autoCommit = false; } else @@ -1293,16 +1309,18 @@ segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); + docWriter.abort(); + // Ask deleter to locate unreferenced files & remove // them: deleter.checkpoint(segmentInfos, false); deleter.refresh(); - ramSegmentInfos = new SegmentInfos(); bufferedDeleteTerms.clear(); numBufferedDeleteTerms = 0; commitPending = false; + docWriter.abort(); close(); } else { @@ -1397,7 +1415,7 @@ for (int base = start; base < segmentInfos.size(); base++) { int end = Math.min(segmentInfos.size(), base+mergeFactor); if (end-base > 1) { - mergeSegments(segmentInfos, base, end); + mergeSegments(base, end); } } } @@ -1437,7 +1455,7 @@ // segments in S may not since they could come from multiple indexes. // Here is the merge algorithm for addIndexesNoOptimize(): // - // 1 Flush ram segments. + // 1 Flush ram. // 2 Consider a combined sequence with segments from T followed // by segments from S (same as current addIndexes(Directory[])). // 3 Assume the highest level for segments in S is h. Call @@ -1458,14 +1476,18 @@ // copy a segment, which may cause doc count to change because deleted // docs are garbage collected. - // 1 flush ram segments + // 1 flush ram ensureOpen(); - flushRamSegments(); + flush(); // 2 copy segment infos and find the highest level from dirs int startUpperBound = minMergeDocs; + // nocommit: what to do? + if (startUpperBound == 0) + startUpperBound = 10; + boolean success = false; startTransaction(); @@ -1524,7 +1546,7 @@ // copy those segments from S for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { - mergeSegments(segmentInfos, i, i + 1); + mergeSegments(i, i + 1); } if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { success = true; @@ -1533,7 +1555,7 @@ } // invariants do not hold, simply merge those segments - mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount); + mergeSegments(segmentCount - numTailSegments, segmentCount); // maybe merge segments again if necessary if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { @@ -1673,22 +1695,17 @@ throws IOException { } - protected final void maybeFlushRamSegments() throws CorruptIndexException, IOException { + protected final void maybeFlush() throws CorruptIndexException, IOException { // A flush is triggered if enough new documents are buffered or - // if enough delete terms are buffered - if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) { - flushRamSegments(); + // if enough delete terms are buffered or enough RAM is + // being consumed + if (numBufferedDeleteTerms >= maxBufferedDeleteTerms || + (autoCommit && ((minMergeDocs != 0 && docWriter.docID >= minMergeDocs) || + (autoCommit && docWriter.getRAMUsed() > ramBufferSize)))) { + flush(); } } - /** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */ - private final synchronized void flushRamSegments() throws CorruptIndexException, IOException { - if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) { - mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size()); - maybeMergeSegments(minMergeDocs); - } - } - /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -1699,7 +1716,89 @@ */ public final synchronized void flush() throws CorruptIndexException, IOException { ensureOpen(); - flushRamSegments(); + + SegmentInfo newSegment = null; + boolean anything = false; + + boolean flushDocs = docWriter.docID > 0; + boolean flushDeletes = bufferedDeleteTerms.size() > 0; + final int numDocs = docWriter.docID; + + if (flushDocs || flushDeletes) { + + SegmentInfos rollback = null; + + if (flushDeletes) + rollback = (SegmentInfos) segmentInfos.clone(); + + boolean success = false; + + try { + if (flushDocs) { + int mergedDocCount = docWriter.docID; + docWriter.flush(); + newSegment = new SegmentInfo(docWriter.segment, + mergedDocCount, + directory, false, true); + segmentInfos.addElement(newSegment); + } + + if (flushDeletes) { + maybeApplyDeletes(flushDocs); + doAfterFlush(); + } + + checkpoint(); + success = true; + } finally { + if (!success) { + if (flushDeletes) { + // Fully replace the segmentInfos since flushed + // deletes could have changed any of the + // SegmentInfo instances: + segmentInfos.clear(); + segmentInfos.addAll(rollback); + } else { + // Remove segment we added, if any: + if (newSegment != null && + segmentInfos.size() > 0 && + segmentInfos.info(segmentInfos.size()-1) == newSegment) + segmentInfos.remove(segmentInfos.size()-1); + docWriter.abort(); + } + deleter.checkpoint(segmentInfos, false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + + if (flushDocs && useCompoundFile) { + success = false; + try { + docWriter.createCompoundFile(newSegment.name); + newSegment.setUseCompoundFile(true); + checkpoint(); + success = true; + } finally { + if (!success) { + newSegment.setUseCompoundFile(false); + deleter.refresh(); + } + } + + deleter.checkpoint(segmentInfos, autoCommit); + } + + // nocommit + // maybeMergeSegments(mergeFactor * numDocs / 2); + + maybeMergeSegments(minMergeDocs); + + // Set up for next segment if we flushed any docs: + if (flushDocs) + docWriter.reset(newSegmentName()); + } } /** Expert: Return the total size of all index files currently cached in memory. @@ -1707,15 +1806,15 @@ */ public final long ramSizeInBytes() { ensureOpen(); - return ramDirectory.sizeInBytes(); + return docWriter.getRAMUsed(); } /** Expert: Return the number of documents whose segments are currently cached in memory. - * Useful when calling flushRamSegments() + * Useful when calling flush() */ public final synchronized int numRamDocs() { ensureOpen(); - return ramSegmentInfos.size(); + return docWriter.docID; } /** Incremental segment merger. */ @@ -1723,6 +1822,9 @@ long lowerBound = -1; long upperBound = startUpperBound; + // nocommit + if (upperBound == 0) upperBound = 10; + while (upperBound < maxMergeDocs) { int minSegment = segmentInfos.size(); int maxSegment = -1; @@ -1754,7 +1856,7 @@ while (numSegments >= mergeFactor) { // merge the leftmost* mergeFactor segments - int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor); + int docCount = mergeSegments(minSegment, minSegment + mergeFactor); numSegments -= mergeFactor; if (docCount > upperBound) { @@ -1783,39 +1885,29 @@ * Merges the named range of segments, replacing them in the stack with a * single segment. */ - private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end) + private final int mergeSegments(int minSegment, int end) throws CorruptIndexException, IOException { - // We may be called solely because there are deletes - // pending, in which case doMerge is false: - boolean doMerge = end > 0; final String mergedName = newSegmentName(); + SegmentMerger merger = null; - - final List ramSegmentsToDelete = new ArrayList(); - SegmentInfo newSegment = null; int mergedDocCount = 0; - boolean anyDeletes = (bufferedDeleteTerms.size() != 0); // This is try/finally to make sure merger's readers are closed: try { - if (doMerge) { - if (infoStream != null) infoStream.print("merging segments"); - merger = new SegmentMerger(this, mergedName); + if (infoStream != null) infoStream.print("merging segments"); - for (int i = minSegment; i < end; i++) { - SegmentInfo si = sourceSegments.info(i); - if (infoStream != null) - infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); - IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) - merger.add(reader); - if (reader.directory() == this.ramDirectory) { - ramSegmentsToDelete.add(si); - } - } + merger = new SegmentMerger(this, mergedName); + + for (int i = minSegment; i < end; i++) { + SegmentInfo si = segmentInfos.info(i); + if (infoStream != null) + infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); + IndexReader reader = SegmentReader.get(si); // no need to set deleter (yet) + merger.add(reader); } SegmentInfos rollback = null; @@ -1825,99 +1917,57 @@ // if we hit exception when doing the merge: try { - if (doMerge) { - mergedDocCount = merger.merge(); + mergedDocCount = merger.merge(); - if (infoStream != null) { - infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); - } + if (infoStream != null) { + infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)"); + } - newSegment = new SegmentInfo(mergedName, mergedDocCount, - directory, false, true); - } + newSegment = new SegmentInfo(mergedName, mergedDocCount, + directory, false, true); - if (sourceSegments != ramSegmentInfos || anyDeletes) { - // Now save the SegmentInfo instances that - // we are replacing: - rollback = (SegmentInfos) segmentInfos.clone(); - } + rollback = (SegmentInfos) segmentInfos.clone(); - if (doMerge) { - if (sourceSegments == ramSegmentInfos) { - segmentInfos.addElement(newSegment); - } else { - for (int i = end-1; i > minSegment; i--) // remove old infos & add new - sourceSegments.remove(i); + for (int i = end-1; i > minSegment; i--) // remove old infos & add new + segmentInfos.remove(i); - segmentInfos.set(minSegment, newSegment); - } - } + segmentInfos.set(minSegment, newSegment); - if (sourceSegments == ramSegmentInfos) { - maybeApplyDeletes(doMerge); - doAfterFlush(); - } - checkpoint(); success = true; } finally { - if (success) { - // The non-ram-segments case is already committed - // (above), so all the remains for ram segments case - // is to clear the ram segments: - if (sourceSegments == ramSegmentInfos) { - ramSegmentInfos.removeAllElements(); - } - } else { + if (!success && rollback != null) { + // Rollback the individual SegmentInfo + // instances, but keep original SegmentInfos + // instance (so we don't try to write again the + // same segments_N file -- write once): + segmentInfos.clear(); + segmentInfos.addAll(rollback); - // Must rollback so our state matches index: - if (sourceSegments == ramSegmentInfos && !anyDeletes) { - // Simple case: newSegment may or may not have - // been added to the end of our segment infos, - // so just check & remove if so: - if (newSegment != null && - segmentInfos.size() > 0 && - segmentInfos.info(segmentInfos.size()-1) == newSegment) { - segmentInfos.remove(segmentInfos.size()-1); - } - } else if (rollback != null) { - // Rollback the individual SegmentInfo - // instances, but keep original SegmentInfos - // instance (so we don't try to write again the - // same segments_N file -- write once): - segmentInfos.clear(); - segmentInfos.addAll(rollback); - } - // Delete any partially created and now unreferenced files: deleter.refresh(); } } } finally { // close readers before we attempt to delete now-obsolete segments - if (doMerge) merger.closeReaders(); + merger.closeReaders(); } - // Delete the RAM segments - deleter.deleteDirect(ramDirectory, ramSegmentsToDelete); - // Give deleter a chance to remove files now. deleter.checkpoint(segmentInfos, autoCommit); - if (useCompoundFile && doMerge) { + if (useCompoundFile) { boolean success = false; try { - merger.createCompoundFile(mergedName + ".cfs"); newSegment.setUseCompoundFile(true); checkpoint(); success = true; - } finally { if (!success) { // Must rollback: @@ -1936,14 +1986,14 @@ // Called during flush to apply any buffered deletes. If // doMerge is true then a new segment was just created and // flushed from the ram segments. - private final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException { + private final void maybeApplyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on " + segmentInfos.size() + " segments."); - if (doMerge) { + if (flushedNewSegment) { IndexReader reader = null; try { reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1)); @@ -1964,7 +2014,7 @@ } int infosEnd = segmentInfos.size(); - if (doMerge) { + if (flushedNewSegment) { infosEnd--; } @@ -1996,6 +2046,8 @@ private final boolean checkNonDecreasingLevels(int start) { int lowerBound = -1; int upperBound = minMergeDocs; + if (upperBound == 0) + upperBound = 10; for (int i = segmentInfos.size() - 1; i >= start; i--) { int docCount = segmentInfos.info(i).docCount; @@ -2044,10 +2096,11 @@ // well as the disk segments. private void bufferDeleteTerm(Term term) { Num num = (Num) bufferedDeleteTerms.get(term); + int numDoc = docWriter.docID; if (num == null) { - bufferedDeleteTerms.put(term, new Num(ramSegmentInfos.size())); + bufferedDeleteTerms.put(term, new Num(numDoc)); } else { - num.setNum(ramSegmentInfos.size()); + num.setNum(numDoc); } numBufferedDeleteTerms++; } @@ -2057,17 +2110,20 @@ // the documents buffered before it, not those buffered after it. private final void applyDeletesSelectively(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { + //System.out.println("now apply selective deletes"); Iterator iter = deleteTerms.entrySet().iterator(); while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); - + //System.out.println(" term " + term); + TermDocs docs = reader.termDocs(term); if (docs != null) { int num = ((Num) entry.getValue()).getNum(); try { while (docs.next()) { int doc = docs.doc(); + //System.out.println(" doc " + doc + " vs " + num); if (doc >= num) { break; } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 522266) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -97,6 +97,7 @@ private PrintStream infoStream; private Directory directory; private IndexDeletionPolicy policy; + private MultiDocumentWriter docWriter; void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; @@ -116,10 +117,12 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream) + public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream, MultiDocumentWriter docWriter) throws CorruptIndexException, IOException { + this.docWriter = docWriter; this.infoStream = infoStream; + this.policy = policy; this.directory = directory; @@ -310,6 +313,8 @@ // Incref the files: incRef(segmentInfos, isCommit); + if (docWriter != null) + incRef(docWriter.files()); if (isCommit) { // Append to our commits list: @@ -340,6 +345,8 @@ lastFiles.add(segmentInfo.files()); } } + if (docWriter != null) + lastFiles.add(docWriter.files()); } } Index: src/java/org/apache/lucene/index/MultiDocumentWriter.java =================================================================== --- src/java/org/apache/lucene/index/MultiDocumentWriter.java (revision 0) +++ src/java/org/apache/lucene/index/MultiDocumentWriter.java (revision 0) @@ -0,0 +1,2777 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.Token; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Fieldable; +import org.apache.lucene.search.Similarity; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RAMOutputStream; + +import org.apache.lucene.util.PriorityQueue; +import org.apache.lucene.util.StringHelper; + +import java.util.zip.Deflater; +import java.util.zip.Inflater; +import java.util.zip.DataFormatException; +import java.io.OutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Vector; +import java.util.Enumeration; +import java.util.Map; +import java.util.Iterator; +import java.text.NumberFormat; + +/** + * When a document is added, its stored fields (if any) and + * term vectors (if any) are immediately written to the + * Directory (ie these do not consume RAM). The terms + * dictionary and freq/prox posting lists are written to ram + * as a single RAMSegment. Periodically these RAMSegments + * are merged (because this compacts them), and eventually + * they are flushed to disk. When it's time to make a real + * segment, all RAMSegments and flushed segments are merged + * into the final postings lists. + */ + +final class MultiDocumentWriter { + + // Only applies when multiple threads may call + // addDocument: max number of pending documents in flight + // to be written to the real segment files. If we hit + // this max then new incoming addDocument calls will wait + // until the line shrinks below this. + public static int MAX_WAIT_QUEUE = 10; + + // How much ram we are allowed to use: + private static final long DEFAULT_RAM_BUFFER_SIZE = 32*1024*1024; + + private IndexWriter writer; + private Directory directory; // dir where final segment is written + + private FieldInfos fieldInfos; // all fields we've seen + + private IndexOutput tvx, tvf, tvd; // to write term vectors + + private FieldsWriter fieldsWriter; // to write stored fields + + private PrintStream infoStream; + String segment; // current segment we are writing + int docID; // next docID + int nextWriteDocID; // next docID to be written + + private List ramSegments = new ArrayList(); + private int[] levelCounts = new int[1]; + private long[] levelSizes = new long[1]; + private long totalSize; + + private List flushedSegments = new ArrayList(); + private int flushedCount; + private int[] flushedLevelCounts = new int[1]; + private long[] flushedLevelSizes = new long[1]; + private long totalFlushedSize; + + // need getter/setter + private int flushedMergeFactor = 10; + + private List files = new ArrayList(); + + private boolean hasNorms; + private boolean flushedVectors; + private boolean flushedNorms; + private boolean doSelfFlush; + + private long ramBufferSize = DEFAULT_RAM_BUFFER_SIZE; + + private List freeThreadStates = new ArrayList(); + private ThreadState[] waitingThreadStates = new ThreadState[1]; + + private int numWaiting = 0; + + void setRAMBufferSizeMB(float mb) { + ramBufferSize = (long) (mb*1024*1024); + } + + long startTime; + + MultiDocumentWriter(String segment, Directory directory, IndexWriter writer, boolean doSelfFlush) throws IOException { + this.directory = directory; + this.writer = writer; + this.doSelfFlush = doSelfFlush; + reset(segment); + startTime = System.currentTimeMillis(); + } + + List files() { + + if (files.size() > 0) + return files; + + files = new ArrayList(); + final int numFlushed = flushedSegments.size(); + for(int i=0;i 0) + state.clearHash(); + */ + } + + if (fieldsWriter != null) { + fieldsWriter.close(); + fieldsWriter = null; + } + + flushedNorms = false; + flushedVectors = false; + + files.clear(); + docID = 0; + nextWriteDocID = 0; + } + + // flush all changes to a real segment + void flush() throws IOException { + // System.out.println("FLUSH: " + docID); + + fieldInfos.write(directory, segment + ".fnm"); + + flushTerms(); + + // write norms of indexed fields + writeNorms(); + + assert fieldInfos.hasVectors() == (tvx != null); + + if (tvx != null) { + flushedVectors = true; + tvx.close(); + tvf.close(); + tvd.close(); + tvx = null; + } else { + flushedVectors = false; + } + + if (fieldsWriter != null) { + fieldsWriter.close(); + fieldsWriter = null; + } + + final int size = freeThreadStates.size(); + for(int i=0;i 0) + state.clearHash(); + */ + } + + flushedNorms = hasNorms; + files.clear(); + } + + // start a new segment + void reset(String segment) throws IOException { + this.segment = segment; + docID = 0; + nextWriteDocID = 0; + hasNorms = false; + fieldInfos = new FieldInfos(); + flushedCount = 0; + Arrays.fill(levelCounts, 0); + Arrays.fill(levelSizes, 0); + Arrays.fill(flushedLevelCounts, 0); + Arrays.fill(flushedLevelSizes, 0); + files.clear(); + } + + private BufferedNorms[] norms = new BufferedNorms[0]; + + // Per-thread items + private class ThreadState { + int docID; + + // We write term vectors to this, privately, and then + // flush the data to the real term vectors file later + private RAMFile tvfLocal; + + RAMFile fdxLocal; + RAMFile fdtLocal; + + private long[] vectorFieldPointers = new long[0]; + private int[] vectorFieldNumbers = new int[0]; + + // TODO: rename + private FieldPostings[] docFieldPostings = new FieldPostings[10]; + private List docFields; + private int numDocFields; + private int numStoredFields; + + FieldPostings[] fieldPostingsArray; + int numFieldPostings; + FieldPostings[] fieldPostingsHash; + + // FieldsWriter that's temporarily private to this + // thread. We write fields here and then copy the + // output to the real FieldsWriter. + private FieldsWriter localFieldsWriter; + + public ThreadState() { + fdxLocal = getRAMFile(0); + fdtLocal = getRAMFile(0); + fieldPostingsArray = new FieldPostings[21]; + fieldPostingsHash = new FieldPostings[28]; + vectorFieldPointers = new long[21]; + vectorFieldNumbers = new int[21]; + + postingsArrayLimit = 21; + postingsArray = new Posting[postingsArrayLimit]; + postingsHashSize = 28; + postingsHash = new Posting[postingsHashSize]; + for(int i=0;i= hi) + return; + + int mid = (lo + hi) / 2; + + if (postings[lo].text.compareTo(postings[mid].text) > 0) { + Posting tmp = postings[lo]; + postings[lo] = postings[mid]; + postings[mid] = tmp; + } + + if (postings[mid].text.compareTo(postings[hi].text) > 0) { + Posting tmp = postings[mid]; + postings[mid] = postings[hi]; + postings[hi] = tmp; + + if (postings[lo].text.compareTo(postings[mid].text) > 0) { + Posting tmp2 = postings[lo]; + postings[lo] = postings[mid]; + postings[mid] = tmp2; + } + } + + int left = lo + 1; + int right = hi - 1; + + if (left >= right) + return; + + String partition = postings[mid].text; + + for (; ;) { + while (postings[right].text.compareTo(partition) > 0) + --right; + + while (left < right && postings[left].text.compareTo(partition) <= 0) + ++left; + + if (left < right) { + Posting tmp = postings[left]; + postings[left] = postings[right]; + postings[right] = tmp; + --right; + } else { + break; + } + } + + quickSort(postings, lo, left); + quickSort(postings, left + 1, hi); + } + + private Posting[] postingsArray; + private int postingsArrayLimit; + private int numPostings; + + private Posting[] postingsHash; + private int postingsHashSize; + + int length; + int position; + int offset; + float boost; + FieldInfo fieldInfo; + + // Tokenizes the fields of a document into Postings. + final void processDocument(Analyzer analyzer) + throws IOException { + + final int maxFieldLength = writer.getMaxFieldLength(); + final int numFields = numFieldPostings; + + // Sort by field name + Arrays.sort(fieldPostingsArray, 0, numFields); + + fdtLocal.writeVInt(numStoredFields); + + for(int i=0;i0) position += analyzer.getPositionIncrementGap(fieldInfo.name); + + final boolean storeOffsets = field.isStoreOffsetWithTermVector(); + + if (!field.isTokenized()) { // un-tokenized field + payload = null; + String stringValue = field.stringValue(); + if (storeOffsets) + addPosition(stringValue, position++, offset, offset + stringValue.length()); + else + addPosition(stringValue, position++, -1, -1); + offset += stringValue.length(); + length++; + } else { + Reader reader; // find or make Reader + if (field.readerValue() != null) + reader = field.readerValue(); + else if (field.stringValue() != null) + reader = new StringReader(field.stringValue()); + else + throw new IllegalArgumentException + ("field must have either String or Reader value"); + + // Tokenize field and add to postingTable + TokenStream stream = analyzer.tokenStream(fieldInfo.name, reader); + try { + Token lastToken = null; + for (Token t = stream.next(); t != null; t = stream.next()) { + position += (t.getPositionIncrement() - 1); + payload = t.getPayload(); + + // TODO: factor this if out of this loop? + if (storeOffsets) + addPosition(t.termText(), position++, offset + t.startOffset(), offset + t.endOffset()); + else + addPosition(t.termText(), position++, -1, -1); + + lastToken = t; + if (++length >= maxFieldLength) { + if (infoStream != null) + infoStream.println("maxFieldLength " +maxFieldLength+ " reached, ignoring following tokens"); + break; + } + } + + if(lastToken != null) + offset += lastToken.endOffset() + 1; + + } finally { + stream.close(); + } + } + + boost *= field.getBoost(); + } + + final class Posting { // info about a Term in a doc + Posting next; + String text; + int freq; // its frequency in doc + int[] positions; // positions it occurs at + Payload[] payloads; + int[] offsetsStart; + int[] offsetsEnd; + } + + private final void addPosition(String text, int position, int offsetStart, int offsetEnd) { + + // System.out.println("offs: " + offsetStart + " " + offsetEnd); + + int hashPos = text.hashCode() % postingsHashSize; + if (hashPos < 0) hashPos += postingsHashSize; + + Posting p = postingsHash[hashPos]; + while(p != null && !p.text.equals(text)) + p = p.next; + + if (p != null) { // word seen before + final int freq = p.freq; + + // Logic below for handling a recycled (from last + // doc) Posting entry. This does not seem to help + // performance, though. + + /* + if (0 == freq) { // recycled from last doc + p.positions = newIntArray(); + p.positions[0] = position; + p.freq = 1; + if (offsetStart != -1) { + p.offsetsStart = newIntArray(); + p.offsetsEnd = newIntArray(); + p.offsetsStart[0] = offsetStart; + p.offsetsEnd[0] = offsetEnd; + } else + p.offsetsStart = null; + + if (payload != null) { + p.payloads = new Payload[1]; + p.payloads[0] = payload; + fieldInfo.storePayloads = true; + } else + p.payloads = null; + + } else { + */ + + if (p.positions.length == freq) { // positions array is full + p.positions = swapUp(p.positions); + if (p.offsetsStart != null) { + p.offsetsStart = swapUp(p.offsetsStart); + p.offsetsEnd = swapUp(p.offsetsEnd); + } + if (p.payloads != null) { + // the current field stores payloads + Payload[] newPayloads = new Payload[freq * 2]; // grow payloads array + System.arraycopy(p.payloads, 0, newPayloads, 0, p.payloads.length); + p.payloads = newPayloads; + } + } + p.positions[freq] = position; // add new position + + if (offsetStart != -1) { + if (p.offsetsStart == null) { + p.offsetsStart = newIntArray(p.positions.length); + p.offsetsEnd = newIntArray(p.positions.length); + } + p.offsetsStart[freq] = offsetStart; + p.offsetsEnd[freq] = offsetEnd; + } + + if (payload != null) { + if (p.payloads == null) + // lazily allocate payload array + p.payloads = new Payload[p.positions.length]; + p.payloads[freq] = payload; + fieldInfo.storePayloads = true; + } + + p.freq = freq + 1; // update frequency + + // } + + } else { // word not seen before + + if (numPostings == postingsArrayLimit) { + // Resize postings array + int newSize = (int) (postingsArrayLimit*1.25); + Posting[] newPostings = new Posting[newSize]; + System.arraycopy(postingsArray, 0, newPostings, 0, postingsArrayLimit); + for(int i=postingsArrayLimit;i>>= 7; + } + b[upto++] = ((byte)i); + return upto; + } + + private void addPostingsAndVectors(FieldPostings fp) + throws CorruptIndexException, IOException { + + final FieldInfo currentField = fp.fieldInfo; + + final Posting[] postings = postingsArray; + final int numTerms = numPostings; + final int fieldNumber = fp.fieldInfo.number; + final int postingsHashSize = postingsHash.length; + + final boolean doVectors = currentField.storeTermVector; + final boolean doOffsets; + final boolean doPositions; + + if (doVectors) { + assert tvfLocal != null; + vectorFieldNumbers[numVectorFields] = fieldNumber; + vectorFieldPointers[numVectorFields++] = tvfLocal.getFilePointer(); + tvfLocal.writeVInt(numTerms); + byte bits = 0x0; + doOffsets = currentField.storeOffsetWithTermVector; + doPositions = currentField.storePositionWithTermVector; + if (doPositions) + bits |= TermVectorsWriter.STORE_POSITIONS_WITH_TERMVECTOR; + if (doOffsets) + bits |= TermVectorsWriter.STORE_OFFSET_WITH_TERMVECTOR; + tvfLocal.writeByte(bits); + } else { + doOffsets = false; + doPositions = false; + } + + // TODO: should we first partition array to remove + // terms that have freq 0 (ie were recycled and then + // did not re-appear in this doc)? + + // int newNumPostings = 0; + // System.out.println(" add postings for field " + fp.fieldInfo.name); + + for(int i=0;i 256 || freeList[newSize] == null) { + // alloc a new array + newArray = new int[newSize]; + cell = null; + // if (newSize <= 256) + // System.out.println("I: " + newSize); + } else { + // reuse existing array + cell = freeList[newSize]; + freeList[newSize] = cell.next; + newArray = cell.array; + } + + // Optimize copy for small arrays + switch(oldSize) { + case 8: + newArray[7] = array[7]; + newArray[6] = array[6]; + newArray[5] = array[5]; + newArray[4] = array[4]; + case 4: + newArray[3] = array[3]; + newArray[2] = array[2]; + case 2: + newArray[1] = array[1]; + case 1: + newArray[0] = array[0]; + break; + default: + System.arraycopy(array, 0, newArray, 0, oldSize); + } + + if (oldSize <= 256) { + // save for reuse later + if (cell == null) { + if (freeCells != null) { + cell = freeCells; + freeCells = cell.next; + } else { + cell = new AllocCell(); + } + } + cell.array = array; + cell.next = freeList[oldSize]; + freeList[oldSize] = cell; + } + + return newArray; + } + + // Returns a length 1 int array + public int[] newIntArray() { + int[] r; + if (freeList[1] != null) { + AllocCell cell = freeList[1]; + r = cell.array; + freeList[1] = cell.next; + cell.next = freeCells; + freeCells = cell; + } else { + r = new int[1]; + } + return r; + } + + public int[] newIntArray(int size) { + int[] r; + if (size <= 256) { + if (freeList[size] != null) { + AllocCell cell = freeList[size]; + r = cell.array; + freeList[size] = cell.next; + cell.next = freeCells; + freeCells = cell; + } else { + r = new int[size]; + } + } else { + r = new int[size]; + System.out.println("I " + size); + } + return r; + } + + // Free this array, recycling if possible + public void recycle(int[] array) { + if (array.length <= 256) { + AllocCell cell; + if (freeCells != null) { + cell = freeCells; + freeCells = cell.next; + } else { + cell = new AllocCell(); + } + cell.array = array; + cell.next = freeList[array.length]; + freeList[array.length] = cell; + } + } + } + + ThreadState getThreadState() { + ThreadState state = null; + while(true) { + final int size = freeThreadStates.size(); + if (0 == size) { + + // There are no free thread states + if (numWaiting >= MAX_WAIT_QUEUE) { + + // There are too many thread states in line write + // to the index so we now pause to give them a + // chance to get scheduled by the JVM and finish + // their documents. Once we wake up again, a + // recycled ThreadState should be available else + // we wait again. + try { + wait(); + } catch (InterruptedException e) { + } + + } else { + // OK, just create a new thread state + state = new ThreadState(); + break; + } + } else { + // Use recycled thread state + state = (ThreadState) freeThreadStates.get(size-1); + freeThreadStates.remove(size-1); + break; + } + } + return state; + } + + void addDocument(Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + + // First pass: go through all fields in doc, updating + // shared FieldInfos and writing any stored fields: + final ThreadState state; + + // nocommit: need try/finally to free up thread state? + + synchronized(this) { + // Allocate a thread state. In the single threaded + // case there will always be exactly one thread + // state. Else, multiple thread states can be "in + // flight" but get recycled/shared here. + state = getThreadState(); + + // Do synchronized initialization. + state.init(doc, docID++); + } + + state.processDocument(analyzer); + + // Now write the indexed document to the real files. + synchronized(this) { + + if (nextWriteDocID == state.docID) { + // It's my turn, so write everything now: + nextWriteDocID++; + flush(state); + + // If any states are waiting, sweep through and + // flush those that are enabled by my write. + if (numWaiting > 0) { + boolean doNotify = numWaiting >= MAX_WAIT_QUEUE; + boolean any = false; + while(true) { + int upto = 0; + for(int i=0;i 0) { + tvd.writeVInt(state.numVectorFields); + for(int i=0;i ramBufferSize/20) { + mergeRAMSegments(state, 0); + if (levelSizes[1] > ramBufferSize/10) + mergeRAMSegments(state, 1); + } + + if (doSelfFlush && totalSize > ramBufferSize) + flushRAMSegments(state); + } + + long getRAMUsed() { + return totalSize; + } + + private final TermInfo termInfo = new TermInfo(); // minimize consing + private IndexOutput freqOutput; + private IndexOutput proxOutput; + private int skipInterval; + private int lastDoc; + private int lastPayloadLength; + private int df; + private boolean currentFieldStorePayloads; + private boolean didPack; + + // Write out the postings & dictionary to real output + // files, in the "real" lucene file format. This is to + // finalize a segment. + void flushTerms() throws IOException { + + if (infoStream != null) + infoStream.println("flush postings as segment " + segment + " docID=" + MultiDocumentWriter.this.docID); + + TermInfosWriter termInfosWriter = null; + + final int numRAMSegments = ramSegments.size(); + final int numFlushedSegments = flushedSegments.size(); + final int numSegmentsIn = numRAMSegments + numFlushedSegments; + resizeMergeInputs(numSegmentsIn); + int numDoc = 0; + + try { + freqOutput = directory.createOutput(segment + ".frq"); + proxOutput = directory.createOutput(segment + ".prx"); + termInfosWriter = new TermInfosWriter(directory, segment, fieldInfos, + writer.getTermIndexInterval()); + skipInterval = termInfosWriter.skipInterval; + + RAMSegmentMergeQueue queue = null; + + queue = new RAMSegmentMergeQueue(numSegmentsIn); + int i=0; + for (;i>> 1; + + // System.out.println("doc=" + doc + " lastDoc=" + lastDoc + " df=" + df); + + //if (!(doc > lastDoc || df == 1)) + //System.out.println("doc=" + doc + " lastDoc=" + lastDoc + " df=" + df); + + assert doc > lastDoc || df == 1; + + final int termDocFreq; + final int newDocCode = (doc-lastDoc)<<1; + lastDoc = doc; + + if ((docCode & 1) != 0) { + freqOutput.writeVInt(newDocCode|1); + termDocFreq = 1; + //System.out.println(" doc " + doc + " freq 1"); + //System.out.println(" write " + (newDocCode|1)); + } else { + freqOutput.writeVInt(newDocCode); + termDocFreq = freq.readVInt(); + //System.out.println(" doc " + doc + " freq " + termDocFreq); + //System.out.println(" write " + newDocCode + " then " + termDocFreq); + + freqOutput.writeVInt(termDocFreq); + } + + /** See {@link DocumentWriter#writePostings(Posting[], String) for + * documentation about the encoding of positions and payloads + */ + for(int j=0;j 0) + copyBytes(prox, proxOutput, payloadLength); + } else { + assert 0 == (deltaCode&1); + proxOutput.writeVInt(deltaCode>>1); + } + } + } + } + + private RAMFile skipBuffer = new RAMFile(); + private int lastSkipDoc; + private int lastSkipPayloadLength; + private long lastSkipFreqPointer; + private long lastSkipProxPointer; + + private void resetSkip() { + lastSkipDoc = 0; + lastSkipPayloadLength = -1; // we don't have to write the first length in the skip list + lastSkipFreqPointer = freqOutput.getFilePointer(); + lastSkipProxPointer = proxOutput.getFilePointer(); + } + + private void bufferSkip(int doc, int payloadLength) throws IOException { + //System.out.println(" buffer skip: freq ptr " + freqPointer + " prox " + proxPointer); + //System.out.println(" vs last freq ptr " + lastSkipFreqPointer + " prox " + lastSkipProxPointer); + + // To efficiently store payloads in the posting lists we do not store the length of + // every payload. Instead we omit the length for a payload if the previous payload had + // the same length. + // However, in order to support skipping the payload length at every skip point must be known. + // So we use the same length encoding that we use for the posting lists for the skip data as well: + // Case 1: current field does not store payloads + // SkipDatum --> DocSkip, FreqSkip, ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // DocSkip records the document number before every SkipInterval th document in TermFreqs. + // Document numbers are represented as differences from the previous value in the sequence. + // Case 2: current field stores payloads + // SkipDatum --> DocSkip, PayloadLength?, FreqSkip,ProxSkip + // DocSkip,FreqSkip,ProxSkip --> VInt + // PayloadLength --> VInt + // In this case DocSkip/2 is the difference between + // the current and the previous value. If DocSkip + // is odd, then a PayloadLength encoded as VInt follows, + // if DocSkip is even, then it is assumed that the + // current payload length equals the length at the previous + // skip point + + final int delta = doc - lastSkipDoc; + if (currentFieldStorePayloads) { + if (payloadLength == lastSkipPayloadLength) + // the current payload length equals the length at the previous skip point, + // so we don't store the length again + skipBuffer.writeVInt(delta << 1); + else { + // the payload length is different from the previous one. We shift the DocSkip, + // set the lowest bit and store the current payload length as VInt. + skipBuffer.writeVInt((delta << 1) + 1); + skipBuffer.writeVInt(payloadLength); + lastSkipPayloadLength = payloadLength; + } + } else + // current field does not store payloads + skipBuffer.writeVInt(delta); + + long freqPointer = freqOutput.getFilePointer(); + long proxPointer = proxOutput.getFilePointer(); + skipBuffer.writeVInt((int) (freqPointer - lastSkipFreqPointer)); + skipBuffer.writeVInt((int) (proxPointer - lastSkipProxPointer)); + lastSkipFreqPointer = freqPointer; + lastSkipProxPointer = proxPointer; + + lastSkipDoc = doc; + } + + private long writeSkip() throws IOException { + long skipPointer = freqOutput.getFilePointer(); + skipBuffer.writeTo(freqOutput); + return skipPointer; + } + + // Called when RAM buffer is full; we now merge all RAM + // segments to a single flushed segment: + final synchronized void flushRAMSegments(ThreadState state) throws IOException { + + if (infoStream != null) { + String name = tempFileName(".tis", flushedCount); + infoStream.println("\n" + getElapsedTime() + ": flush ram segments at docID " + docID + ", to " + name.substring(0, name.length()-4) + ": totalRam=" + (totalSize/1024/1024) + " MB"); + } + didPack = false; + + IndexOutput termsOut = directory.createOutput(tempFileName(".tis", flushedCount)); + IndexOutput freqOut = directory.createOutput(tempFileName(".frq", flushedCount)); + IndexOutput proxOut = directory.createOutput(tempFileName(".prx", flushedCount)); + + final int numSegmentsIn = ramSegments.size(); + long newSize; + long oldSize = totalSize; + + resizeMergeInputs(numSegmentsIn); + + int numDoc = 0; + for(int i=0;i start; i--) // remove old infos & add new + flushedSegments.remove(i); + + FlushedSegment newFlushedSegment = new FlushedSegment(numDoc, flushedCount++); + flushedSegments.set(start, newFlushedSegment); + + if (flushedLevelSizes.length == level+1) { + flushedLevelSizes = realloc(flushedLevelSizes, 1+flushedLevelSizes.length); + flushedLevelCounts = realloc(flushedLevelCounts, 1+flushedLevelCounts.length); + } + + flushedLevelSizes[level] -= oldSize; + flushedLevelSizes[1+level] += newSize; + + flushedLevelCounts[level] -= (end-start); + flushedLevelCounts[1+level]++; + + totalFlushedSize += newSize - oldSize; + + if (infoStream != null) + infoStream.println("merge flushed segments done: oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalFlushed=" + (totalFlushedSize/1024/1024) + " MB"); + + files.clear(); + + // Have deleter remove our now unreferenced files: + writer.getDeleter().checkpoint(writer.segmentInfos, false); + } + + static void close(IndexOutput f0, IndexOutput f1, IndexOutput f2) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (f2 != null) f2.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + static void close(IndexInput f0, IndexInput f1, IndexInput f2) throws IOException { + IOException keep = null; + try { + if (f0 != null) f0.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (f1 != null) f1.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (f2 != null) f2.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + static void close(IndexOutput freq, IndexOutput prox, TermInfosWriter terms) throws IOException { + IOException keep = null; + try { + if (freq != null) freq.close(); + } catch (IOException e) { + keep = e; + } finally { + try { + if (prox != null) prox.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + try { + if (terms != null) terms.close(); + } catch (IOException e) { + if (keep == null) keep = e; + } finally { + if (keep != null) throw keep; + } + } + } + } + + NumberFormat nf = NumberFormat.getInstance(); + String getElapsedTime() { + long t = System.currentTimeMillis(); + nf.setMaximumFractionDigits(1); + nf.setMinimumFractionDigits(1); + return nf.format((t-startTime)/1000.0) + " sec"; + } + + // In-memory merge: reads multiple ram segments (in the + // modified format) and replaces with a single ram segment. + final void mergeRAMSegments(ThreadState state, int level) throws IOException { + + RAMFile termsOut = state.getRAMFile(4); + RAMFile freqOut = state.getRAMFile(4); + RAMFile proxOut = state.getRAMFile(4); + + int start = 0; + int end = 0; + for(int i=levelCounts.length-1;i>=level;i--) { + start = end; + end += levelCounts[i]; + } + + if (infoStream != null) + infoStream.println("\n" + getElapsedTime() + ": merge ram segments: level " + level + ": start idx " + start + " to end idx " + end + " docID=" + docID); + + long oldSize; + long oldTermsSize; + long oldFreqSize; + long oldProxSize; + long newSize; + + int numDoc; + RAMSegment newRAMSegment; + + if (end == start+1) { + // Degenerate case, if suddenly an immense document + // comes through + newRAMSegment = (RAMSegment) ramSegments.get(start); + oldTermsSize = newRAMSegment.terms.size; + oldFreqSize = newRAMSegment.freq.size; + oldProxSize = newRAMSegment.prox.size; + newSize = oldSize = newRAMSegment.terms.size + newRAMSegment.freq.size + newRAMSegment.prox.size; + numDoc = newRAMSegment.numDoc; + } else { + + resizeMergeInputs(end-start); + final int numSegmentsIn = end-start; + + oldSize = 0; + oldTermsSize = 0; + oldFreqSize = 0; + oldProxSize = 0; + int upto = 0; + numDoc = 0; + for(int i=start;i start; i--) { // remove old infos & add new + RAMSegment rs = (RAMSegment) ramSegments.get(i); + state.recycle(rs.prox); + state.recycle(rs.freq); + state.recycle(rs.terms); + ramSegments.remove(i); + } + + RAMSegment rs = (RAMSegment) ramSegments.get(start); + state.recycle(rs.prox); + state.recycle(rs.freq); + state.recycle(rs.terms); + + newSize = termsOut.size + freqOut.size + proxOut.size; + + newRAMSegment = new RAMSegment(numDoc, termsOut, freqOut, proxOut); + ramSegments.set(start, newRAMSegment); + } + + if (levelSizes.length == level+1) { + levelSizes = realloc(levelSizes, 1+levelSizes.length); + levelCounts = realloc(levelCounts, 1+levelCounts.length); + } + + levelSizes[level] -= oldSize; + levelSizes[1+level] += newSize; + + levelCounts[level] -= (end-start); + levelCounts[1+level]++; + + totalSize += newSize - oldSize; + + if (infoStream != null) { + infoStream.println(" oldSize=" + oldSize + " newSize=" + newSize + " new/old=" + ((int)(100.0*newSize/oldSize)) + "% totalRAM=" + (totalSize/1024/1024) + " MB"); + infoStream.println(" termsSize=" + termsOut.size + " freqSize=" + freqOut.size + " proxSize=" + proxOut.size); + infoStream.println(" oldTermsSize=" + oldTermsSize + " oldFreqSize=" + oldFreqSize + " oldProxSize=" + oldProxSize); + System.out.println(" bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut + " cellsAlloc=" + RAMCell.numCellsAlloc); + } + } + + SegmentMergeInfo mergeInputs[] = new SegmentMergeInfo[0]; + + final void resizeMergeInputs(final int minSize) { + if (mergeInputs.length < minSize) { + int size = (int) (minSize*1.25); + SegmentMergeInfo[] newMergeInputs = new SegmentMergeInfo[size]; + System.arraycopy(mergeInputs, 0, newMergeInputs, 0, mergeInputs.length); + for(int i=mergeInputs.length;i 0) { + + SegmentMergeInfo smi = (SegmentMergeInfo) queue.pop(); + int df = smi.df; + + if (debug) { + System.out.println(" term " + fieldInfos.fieldName(smi.fieldNumber) + ":" + new String(smi.textBuffer, 0, smi.textLength)); + System.out.println(" idx " + smi.idx + " freqSize=" + smi.freqSize + " proxSize=" + smi.proxSize + " df=" + smi.df); + } + + // bulk copy + smi.flush(); + + // write term (prefix compressed), docFreq, freq ptr, prox ptr + int len = lastCharsLength < smi.textLength ? lastCharsLength : smi.textLength; + int prefix = len; + for(int i=0;i 0; + + if (srcIn instanceof RAMFileReader) { + RAMFileReader src = (RAMFileReader) srcIn; + while(true) { + final int chunk = src.limit - src.upto; + if (chunk < numBytes) { + // Src is the limit + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + numBytes -= chunk; + } else if (chunk == numBytes) { + // Matched + destIn.writeBytes(src.buffer, src.upto, chunk); + src.nextBuffer(); + break; + } else { + // numBytes is the limit + destIn.writeBytes(src.buffer, src.upto, (int) numBytes); + src.upto += numBytes; + break; + } + } + } else { + // Use intermediate buffer + while(numBytes > 0) { + final int chunk; + if (numBytes > 1024) { + chunk = 1024; + } else { + chunk = (int) numBytes; + } + srcIn.readBytes(byteBuffer, 0, chunk); + destIn.writeBytes(byteBuffer, chunk); + numBytes -= chunk; + } + } + } + + /** If non-null, a message will be printed to this if maxFieldLength is reached. + */ + void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + // nocommit + // this.infoStream = System.out; + } + + private class RAMSegment { + int numDoc; + RAMFile terms; + RAMFile freq; + RAMFile prox; + public RAMSegment(int numDoc, RAMFile terms, RAMFile freq, RAMFile prox) { + this.numDoc = numDoc; + this.terms = terms; + this.freq = freq; + this.prox = prox; + } + } + + private class FlushedSegment { + int numDoc; + int segment; + public FlushedSegment(int numDoc, int segment) { + this.numDoc = numDoc; + this.segment = segment; + } + } + + final class SegmentMergeInfo { + int idx; + + char textBuffer[] = new char[10]; + int textLength; + int fieldNumber; + + IndexInput terms; + IndexInput freq; + IndexInput prox; + + IndexOutput freqOut; + IndexOutput proxOut; + + private long freqSize; + private long proxSize; + + long size; + long pos; + int df; + + SegmentMergeInfo(int idx) { + this.idx = idx; + } + + public void setInputs(IndexInput terms, IndexInput freq, IndexInput prox) { + this.terms = terms; + this.freq = freq; + this.prox = prox; + } + public void setOutputs(IndexOutput freqOut, IndexOutput proxOut) { + this.freqOut = freqOut; + this.proxOut = proxOut; + } + + public boolean next() throws IOException { + final int start = terms.readVInt(); + if (start == Integer.MAX_VALUE) { + return false; + } + + final int length = terms.readVInt(); + textLength = start + length; + if (textLength > textBuffer.length) { + char[] newTextBuffer = new char[(int) (textLength*1.5)]; + //System.out.println("start=" + start + " length=" + length + " = textLength " + textLength + " vs " + newTextBuffer.length); + // System.out.println("here: " + terms); + System.arraycopy(textBuffer, 0, newTextBuffer, 0, start); + textBuffer = newTextBuffer; + } + terms.readChars(textBuffer, start, length); + fieldNumber = terms.readVInt(); + df = terms.readVInt(); + freqSize = terms.readVLong(); + proxSize = terms.readVLong(); + return true; + } + + public void flush() throws IOException { + copyBytes(freq, freqOut, freqSize); + copyBytes(prox, proxOut, proxSize); + } + + public void close() throws IOException { + MultiDocumentWriter.close(terms, freq, prox); + } + + protected final boolean equalTerm(int otherFieldNumber, char[] otherTextBuffer, int otherTextLength) { + if (otherFieldNumber == fieldNumber) { + final char[] textA = textBuffer; + final char[] textB = otherTextBuffer; + if (textLength != otherTextLength) + return false; + for(int i=0;i charB) + return false; + } + + if (stiA.textLength < stiB.textLength) + return true; + else if (stiA.textLength > stiB.textLength) + return false; + + // finally by index + return stiA.idx < stiB.idx; + + } else { + // fields differ: + String fieldA = fieldInfos.fieldName(stiA.fieldNumber); + String fieldB = fieldInfos.fieldName(stiB.fieldNumber); + return fieldA.compareTo(fieldB) < 0; + } + } + } + + private static final class RAMCell { + + final static int MAX_LEVEL = 4; + static RAMCell freeCells[] = new RAMCell[1+RAMCell.MAX_LEVEL]; + + static int numBufferOut; + static int numByteOut; + static int numCellsAlloc; + + byte[] buffer; + RAMCell next; + byte level; + byte subLevel; + + static void recycle(RAMCell cell) { + cell.next = freeCells[cell.level]; + freeCells[cell.level] = cell; + + // ASSERT + numBufferOut--; + numByteOut -= cell.buffer.length; + // System.out.println("F level " + cell.level + " " + cell.buffer + " bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut); + } + + public static RAMCell alloc(final int level, final int subLevel) { + RAMCell r = freeCells[level]; + if (r != null) { + // reuse + freeCells[level] = r.next; + r.subLevel = (byte) subLevel; + r.next = null; + // System.out.println("R " + level + ": " + r.buffer + " bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut); + } else { + r = new RAMCell(level, subLevel); + numCellsAlloc++; + //System.out.println(" new alloc " + level + ": " + r); + } + // ASSERT + numBufferOut++; + numByteOut += r.buffer.length; + return r; + } + + public RAMCell(final int level, final int subLevel) { + this.level = (byte) level; + this.subLevel = (byte) subLevel; + int size = 0; + switch(this.level) { + case 0: + size = 64; + break; + case 1: + size = 256; + break; + case 2: + size = 2048; + break; + case 3: + size = 8192; + break; + case 4: + size = 32768; + break; + } + buffer = new byte[size]; + // System.out.println("B " + level + " " + buffer + ": " + freeCells[level] + " bufferOut=" + RAMCell.numBufferOut + " byteOut=" + RAMCell.numByteOut); + } + } + + private static final class RAMFile extends IndexOutput { + + RAMCell head; + RAMCell tail; + RAMFileReader r; + RAMFile next; + int upto; + int limit; + byte[] buffer; + int size; + + boolean isFree = false; + + public void printAlloc(String prefix) { + RAMCell c = head; + System.out.print(prefix + ":"); + while(c != null) { + if (c.next == null) { + System.out.println(" " + c.buffer.length + "(" + upto + ")"); + break; + } else { + System.out.print(" " + c.buffer.length); + c = c.next; + } + } + System.out.println(""); + } + + void setStartLevel(int level) { + assert head == null; + head = tail = RAMCell.alloc(level, 0); + buffer = head.buffer; + upto = 0; + limit = head.buffer.length; + size = limit; + } + + // reset ourself, transfer all buffers to a new reader, and return that reader + public IndexInput getReader() { + // System.out.println(" GET READER: " + this + ": " + length()); + assert upto > 0; + if (head == null) + return null; + else if (r == null) + r = new RAMFileReader(this); + else + r.reset(this); + reset(); + return r; + } + + public void writeByte(byte b) { + assert !isFree; + if (upto == limit) + nextBuffer(); + buffer[upto++] = b; + } + + // Move all of our bytes to out and reset + public void writeTo(IndexOutput out) throws IOException { + assert !isFree; + while(head != null) { + final int numBytes; + if (head.next == null) + numBytes = upto; + else + numBytes = head.buffer.length; + // System.out.println("writeTo: " + numBytes); + out.writeBytes(head.buffer, numBytes); + RAMCell next = head.next; + RAMCell.recycle(head); + head = next; + } + reset(); + } + + private void reset() { + assert !isFree; + head = tail = null; + buffer = null; + size = limit = upto = 0; + } + + private void free() { + assert !isFree; + while(head != null) { + RAMCell c = head.next; + RAMCell.recycle(head); + head = c; + } + reset(); + } + + public void writeBytes(byte[] b, int offset, int numBytes) { + assert !isFree; + assert numBytes > 0; + switch(numBytes) { + case 4: + writeByte(b[offset++]); + case 3: + writeByte(b[offset++]); + case 2: + writeByte(b[offset++]); + case 1: + writeByte(b[offset++]); + break; + default: + if (upto == limit) + nextBuffer(); + // System.out.println(" writeBytes: buffer=" + buffer + " head=" + head + " offset=" + offset + " nB=" + numBytes); + while(true) { + int chunk = limit - upto; + if (chunk >= numBytes) { + System.arraycopy(b, offset, buffer, upto, numBytes); + upto += numBytes; + break; + } else { + System.arraycopy(b, offset, buffer, upto, chunk); + offset += chunk; + numBytes -= chunk; + nextBuffer(); + } + } + } + } + + public void nextBuffer() { + assert !isFree; + + final int level; + final int subLevel; + if (tail == null) { + level = 0; + subLevel = 0; + } else if (tail.level < RAMCell.MAX_LEVEL) { + if (3 == tail.subLevel) { + level = 1+tail.level; + subLevel = 0; + } else { + level = tail.level; + subLevel = 1+tail.subLevel; + } + } else { + subLevel = 0; + level = RAMCell.MAX_LEVEL; + } + + RAMCell c = RAMCell.alloc(level, subLevel); + + if (head == null) + head = tail = c; + else { + tail.next = c; + tail = c; + } + + limit = c.buffer.length; + size += limit; + buffer = c.buffer; + + upto = 0; + } + + public long getFilePointer() { + assert !isFree; + return size - (limit-upto); + } + + public long length() { + assert !isFree; + return getFilePointer(); + } + + public void close() {} + public void flush() {throw new RuntimeException("not implemented");} + public void seek(long pos) {throw new RuntimeException("not implemented");} + } + + // Limited IndexInput for "read once". This frees each + // buffer from the head once it's been read. It can only + // be created off an already written RAMFile. + private static final class RAMFileReader extends IndexInput { + + int readLimit; + int upto; + int limit; + RAMCell head; + byte[] buffer; + + // ASSERT + boolean finished = true; + + RAMFileReader(RAMFile ramFile) { + reset(ramFile); + } + + public void reset(RAMFile ramFile) { + // Make sure we were fully read + assert finished; + finished = false; + readLimit = ramFile.upto; + head = ramFile.head; + buffer = head.buffer; + if (head.next == null) + limit = readLimit; + else + limit = buffer.length; + } + + public byte readByte() { + byte b = buffer[upto++]; + if (upto == limit) + nextBuffer(); + return b; + } + + public void nextBuffer() { + RAMCell c = head.next; + RAMCell.recycle(head); + head = c; + upto = 0; + if (head != null) { + buffer = head.buffer; + if (head.next == null) + limit = readLimit; + else + limit = buffer.length; + } else { + // ASSERT + finished = true; + buffer = null; + } + } + + public void readBytes(byte[] b, int offset, int len) {throw new RuntimeException("not implemented");} + public void close() {} + public long getFilePointer() {throw new RuntimeException("not implemented");} + public void seek(long pos) {throw new RuntimeException("not implemented");} + public long length() {throw new RuntimeException("not implemented");} + } + + static final byte defaultNorm = Similarity.encodeNorm(1.0f); + + private static class BufferedNorms { + + RAMFile out = new RAMFile(); + int upto; + + void add(float norm) { + byte b = Similarity.encodeNorm(norm); + out.writeByte(b); + upto++; + } + + void fill(int docID) { + // System.out.println(" now fill: " + upto + " vs " + docID); + while(upto < docID) { + // fill in docs that didn't have this field: + out.writeByte(defaultNorm); + upto++; + } + } + } + + static long[] realloc(long[] array, int newSize) { + long[] newArray = new long[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static int[] realloc(int[] array, int newSize) { + int[] newArray = new int[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } + + static float[] realloc(float[] array, int newSize) { + float[] newArray = new float[newSize]; + System.arraycopy(array, 0, newArray, 0, array.length); + return newArray; + } +} Property changes on: src/java/org/apache/lucene/index/MultiDocumentWriter.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/SegmentMergeInfo.java =================================================================== --- src/java/org/apache/lucene/index/SegmentMergeInfo.java (revision 522266) +++ src/java/org/apache/lucene/index/SegmentMergeInfo.java (working copy) @@ -73,9 +73,8 @@ final void close() throws IOException { termEnum.close(); - if (postings != null) { - postings.close(); + if (postings != null) + postings.close(); } } -} Index: src/java/org/apache/lucene/store/IndexOutput.java =================================================================== --- src/java/org/apache/lucene/store/IndexOutput.java (revision 522266) +++ src/java/org/apache/lucene/store/IndexOutput.java (working copy) @@ -125,6 +125,24 @@ } } + public void writeChars(char[] s, int start, int length) + throws IOException { + final int end = start + length; + for (int i = start; i < end; i++) { + final int code = (int)s[i]; + if (code >= 0x01 && code <= 0x7F) + writeByte((byte)code); + else if (((code >= 0x80) && (code <= 0x7FF)) || code == 0) { + writeByte((byte)(0xC0 | (code >> 6))); + writeByte((byte)(0x80 | (code & 0x3F))); + } else { + writeByte((byte)(0xE0 | (code >>> 12))); + writeByte((byte)(0x80 | ((code >> 6) & 0x3F))); + writeByte((byte)(0x80 | (code & 0x3F))); + } + } + } + /** Forces any buffered output to be written. */ public abstract void flush() throws IOException; Index: src/demo/org/apache/lucene/demo/IndexLineFiles.java =================================================================== --- src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) +++ src/demo/org/apache/lucene/demo/IndexLineFiles.java (revision 0) @@ -0,0 +1,145 @@ +package org.apache.lucene.demo; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.document.DateTools; + +import java.io.File; +import java.io.FileReader; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Date; + +// nocommit +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; + +/** Index all text files under a directory. */ +public class IndexLineFiles { + + private IndexLineFiles() {} + + static final File INDEX_DIR = new File("index"); + + static int bufferSize; + + /** Index all text files under a directory. */ + public static void main(String[] args) throws IOException { + String usage = "java org.apache.lucene.demo.IndexFiles "; + + if (args.length == 0) { + System.err.println("Usage: " + usage); + System.exit(1); + } + + boolean autoCommit = args[1].equals("yes"); + bufferSize = Integer.parseInt(args[2]); + int numDoc = Integer.parseInt(args[3]); + int maxBufferedDocs = Integer.parseInt(args[4]); + boolean optimize = args[5].equals("yes"); + + Field.TermVector tvMode; + if (args[6].equals("no")) + tvMode = Field.TermVector.NO; + else if (args[6].equals("yes")) + tvMode = Field.TermVector.YES; + else if (args[6].equals("pos")) + tvMode = Field.TermVector.WITH_POSITIONS; + else if (args[6].equals("posoffs")) + tvMode = Field.TermVector.WITH_POSITIONS_OFFSETS; + else + throw new RuntimeException("bad term vector mode: " + args[6]); + + System.out.println("\nFAST: autoCommit=" + autoCommit + " bufferSize=" + bufferSize + "MB docLimit=" + numDoc + " optimize=" + optimize + " termVectors=" + args[6]); + + if (INDEX_DIR.exists()) { + System.out.println("Cannot save index to '" +INDEX_DIR+ "' directory, please delete it first"); + System.exit(1); + } + + Date start = new Date(); + try { + // IndexWriter writer = new IndexWriter(INDEX_DIR, new StandardAnalyzer(), true); + IndexWriter writer = new IndexWriter(FSDirectory.getDirectory(INDEX_DIR), autoCommit, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(maxBufferedDocs); + writer.setMaxFieldLength(100000); + writer.setRAMBufferSizeMB(bufferSize); + writer.setUseCompoundFile(false); + writer.setInfoStream(System.out); + // writer.setMaxFieldLength(10000000); + //writer.setMaxFieldLength(1000); + + int count = 0; + while(count < numDoc) { + + BufferedReader input = new BufferedReader(new FileReader(args[0])); + String line = null; + + while (( line = input.readLine()) != null) { + // if (line.length() <= 2) continue; + Document doc = new Document(); + + // Add the path of the file as a field named "path". Use a field that is + // indexed (i.e. searchable), but don't tokenize the field into words. + doc.add(new Field("path", args[0], Field.Store.YES, Field.Index.UN_TOKENIZED)); + + // Add the last modified date of the file a field named "modified". Use + // a field that is indexed (i.e. searchable), but don't tokenize the field + // into words. + doc.add(new Field("modified", + "200703161637", + Field.Store.YES, Field.Index.UN_TOKENIZED)); + + // System.out.println("HERE: " + line); + doc.add(new Field("contents", line, Field.Store.NO, Field.Index.TOKENIZED, tvMode)); + writer.addDocument(doc); + netMem += bean.getHeapMemoryUsage().getUsed(); + if (++count == numDoc) + break; + } + input.close(); + } + if (optimize) { + System.out.println("Optimize..."); + writer.optimize(); + } + writer.close(); + + Date end = new Date(); + System.out.println(count + " docs; " + (end.getTime() - start.getTime()) + " total milliseconds"); + System.out.println("avg mem: " + (netMem/count/1024/1024) + " MB"); + + } catch (IOException e) { + e.printStackTrace(System.out); + System.out.println(" caught a " + e.getClass() + + "\n with message: " + e.getMessage()); + } + } + + static MemoryMXBean bean = ManagementFactory.getMemoryMXBean(); + static long netMem; + static int netCount; +} Property changes on: src/demo/org/apache/lucene/demo/IndexLineFiles.java ___________________________________________________________________ Name: svn:eol-style + native