Index: contrib/xml-query-parser/src/test/org/apache/lucene/xmlparser/TestParser.java
===================================================================
--- contrib/xml-query-parser/src/test/org/apache/lucene/xmlparser/TestParser.java (revision 547668)
+++ contrib/xml-query-parser/src/test/org/apache/lucene/xmlparser/TestParser.java (working copy)
@@ -77,6 +77,7 @@
line=d.readLine();
}
d.close();
+ writer.close();
}
reader=IndexReader.open(dir);
searcher=new IndexSearcher(reader);
Index: src/test/org/apache/lucene/search/TestTermVectors.java
===================================================================
--- src/test/org/apache/lucene/search/TestTermVectors.java (revision 547668)
+++ src/test/org/apache/lucene/search/TestTermVectors.java (working copy)
@@ -291,6 +291,80 @@
Field.Index.TOKENIZED, Field.TermVector.YES));
//System.out.println("Document: " + doc);
}
-
-
+
+ // Test only a few docs having vectors
+ public void testRareVectors() throws IOException {
+ IndexWriter writer = new IndexWriter(directory, new SimpleAnalyzer(), true);
+ for(int i=0;i<100;i++) {
+ Document doc = new Document();
+ doc.add(new Field("field", English.intToEnglish(i),
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.NO));
+ writer.addDocument(doc);
+ }
+ for(int i=0;i<10;i++) {
+ Document doc = new Document();
+ doc.add(new Field("field", English.intToEnglish(100+i),
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ writer.addDocument(doc);
+ }
+
+ writer.close();
+ searcher = new IndexSearcher(directory);
+
+ Query query = new TermQuery(new Term("field", "hundred"));
+ Hits hits = searcher.search(query);
+ assertEquals(10, hits.length());
+ for (int i = 0; i < hits.length(); i++) {
+ TermFreqVector [] vector = searcher.reader.getTermFreqVectors(hits.id(i));
+ assertTrue(vector != null);
+ assertTrue(vector.length == 1);
+ }
+ }
+
+
+ // In a single doc, for the same field, mix the term
+ // vectors up
+ public void testMixedVectrosVectors() throws IOException {
+ IndexWriter writer = new IndexWriter(directory, new SimpleAnalyzer(), true);
+ Document doc = new Document();
+ doc.add(new Field("field", "one",
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.NO));
+ doc.add(new Field("field", "one",
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.YES));
+ doc.add(new Field("field", "one",
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS));
+ doc.add(new Field("field", "one",
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_OFFSETS));
+ doc.add(new Field("field", "one",
+ Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ writer.addDocument(doc);
+ writer.close();
+
+ searcher = new IndexSearcher(directory);
+
+ Query query = new TermQuery(new Term("field", "one"));
+ Hits hits = searcher.search(query);
+ assertEquals(1, hits.length());
+
+ TermFreqVector [] vector = searcher.reader.getTermFreqVectors(hits.id(0));
+ assertTrue(vector != null);
+ assertTrue(vector.length == 1);
+ TermPositionVector tfv = (TermPositionVector) vector[0];
+ assertTrue(tfv.getField().equals("field"));
+ String[] terms = tfv.getTerms();
+ assertEquals(1, terms.length);
+ assertEquals(terms[0], "one");
+ assertEquals(5, tfv.getTermFrequencies()[0]);
+
+ int[] positions = tfv.getTermPositions(0);
+ assertEquals(5, positions.length);
+ for(int i=0;i<5;i++)
+ assertEquals(i, positions[i]);
+ TermVectorOffsetInfo[] offsets = tfv.getOffsets(0);
+ assertEquals(5, offsets.length);
+ for(int i=0;i<5;i++) {
+ assertEquals(4*i, offsets[i].getStartOffset());
+ assertEquals(4*i+3, offsets[i].getEndOffset());
+ }
+ }
}
Index: src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java (working copy)
@@ -57,7 +57,7 @@
for (int i = 0; i < 100; i++) {
addDoc(writer);
checkInvariants(writer);
- if (writer.getRamSegmentCount() + writer.getSegmentCount() >= 18) {
+ if (writer.getNumBufferedDocuments() + writer.getSegmentCount() >= 18) {
noOverMerge = true;
}
}
@@ -195,7 +195,7 @@
int mergeFactor = writer.getMergeFactor();
int maxMergeDocs = writer.getMaxMergeDocs();
- int ramSegmentCount = writer.getRamSegmentCount();
+ int ramSegmentCount = writer.getNumBufferedDocuments();
assertTrue(ramSegmentCount < maxBufferedDocs);
int lowerBound = -1;
Index: src/test/org/apache/lucene/index/index.presharedstores.nocfs.zip
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: src/test/org/apache/lucene/index/index.presharedstores.nocfs.zip
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Index: src/test/org/apache/lucene/index/index.presharedstores.cfs.zip
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: src/test/org/apache/lucene/index/index.presharedstores.cfs.zip
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy)
@@ -110,7 +110,7 @@
}
modifier.flush();
- assertEquals(0, modifier.getRamSegmentCount());
+ assertEquals(0, modifier.getNumBufferedDocuments());
assertTrue(0 < modifier.getSegmentCount());
if (!autoCommit) {
@@ -156,7 +156,6 @@
int id = 0;
int value = 100;
-
addDoc(modifier, ++id, value);
modifier.deleteDocuments(new Term("value", String.valueOf(value)));
addDoc(modifier, ++id, value);
@@ -452,7 +451,7 @@
String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
- IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null);
+ IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list();
Arrays.sort(startFiles);
Index: src/test/org/apache/lucene/index/TestIndexReader.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexReader.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestIndexReader.java (working copy)
@@ -803,7 +803,7 @@
String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
- IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null);
+ IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list();
Arrays.sort(startFiles);
Index: src/test/org/apache/lucene/index/TestIndexWriter.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy)
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.File;
import java.util.Arrays;
+import java.util.Random;
import junit.framework.TestCase;
@@ -199,6 +200,7 @@
for(int iter=0;iter<6;iter++) {
// Start with 100 bytes more than we are currently using:
+
long diskFree = diskUsage+100;
boolean autoCommit = iter % 2 == 0;
@@ -478,7 +480,7 @@
String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
- IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null);
+ IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list();
Arrays.sort(startFiles);
@@ -859,6 +861,7 @@
public void testCommitOnCloseAbort() throws IOException {
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
for (int i = 0; i < 14; i++) {
addDoc(writer);
}
@@ -871,6 +874,7 @@
searcher.close();
writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false);
+ writer.setMaxBufferedDocs(10);
for(int j=0;j<17;j++) {
addDoc(writer);
}
@@ -895,6 +899,8 @@
// Now make sure we can re-open the index, add docs,
// and all is good:
writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), false);
+ writer.setMaxBufferedDocs(10);
+
for(int i=0;i<12;i++) {
for(int j=0;j<17;j++) {
addDoc(writer);
@@ -962,6 +968,7 @@
public void testCommitOnCloseOptimize() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
for(int j=0;j<17;j++) {
addDocWithIndex(writer, j);
}
@@ -1002,6 +1009,255 @@
reader.close();
}
+ public void testIndexNoDocuments() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.flush();
+ writer.close();
+
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(0, reader.maxDoc());
+ assertEquals(0, reader.numDocs());
+ reader.close();
+
+ writer = new IndexWriter(dir, new WhitespaceAnalyzer(), false);
+ writer.flush();
+ writer.close();
+
+ reader = IndexReader.open(dir);
+ assertEquals(0, reader.maxDoc());
+ assertEquals(0, reader.numDocs());
+ reader.close();
+ }
+
+ public void testManyFields() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
+ for(int j=0;j<100;j++) {
+ Document doc = new Document();
+ doc.add(new Field("a"+j, "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
+ doc.add(new Field("b"+j, "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
+ doc.add(new Field("c"+j, "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
+ doc.add(new Field("d"+j, "aaa", Field.Store.YES, Field.Index.TOKENIZED));
+ doc.add(new Field("e"+j, "aaa", Field.Store.YES, Field.Index.TOKENIZED));
+ doc.add(new Field("f"+j, "aaa", Field.Store.YES, Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ }
+ writer.close();
+
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(100, reader.maxDoc());
+ assertEquals(100, reader.numDocs());
+ for(int j=0;j<100;j++) {
+ assertEquals(1, reader.docFreq(new Term("a"+j, "aaa"+j)));
+ assertEquals(1, reader.docFreq(new Term("b"+j, "aaa"+j)));
+ assertEquals(1, reader.docFreq(new Term("c"+j, "aaa"+j)));
+ assertEquals(1, reader.docFreq(new Term("d"+j, "aaa")));
+ assertEquals(1, reader.docFreq(new Term("e"+j, "aaa")));
+ assertEquals(1, reader.docFreq(new Term("f"+j, "aaa")));
+ }
+ reader.close();
+ dir.close();
+ }
+
+ public void testSmallRAMBuffer() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setRAMBufferSizeMB(0.000001);
+ int lastNumFile = dir.list().length;
+ for(int j=0;j<9;j++) {
+ Document doc = new Document();
+ doc.add(new Field("field", "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ int numFile = dir.list().length;
+ // Verify that with a tiny RAM buffer we see new
+ // segment after every doc
+ assertTrue(numFile > lastNumFile);
+ lastNumFile = numFile;
+ }
+ writer.close();
+ dir.close();
+ }
+
+ // Make sure it's OK to change RAM buffer size and
+ // maxBufferedDocs in a write session
+ public void testChangingRAMBuffer() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
+ int lastNumFile = dir.list().length;
+ long lastGen = -1;
+ for(int j=1;j<52;j++) {
+ Document doc = new Document();
+ doc.add(new Field("field", "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ long gen = SegmentInfos.generationFromSegmentsFileName(SegmentInfos.getCurrentSegmentFileName(dir.list()));
+ if (j == 1)
+ lastGen = gen;
+ else if (j < 10)
+ // No new files should be created
+ assertEquals(gen, lastGen);
+ else if (10 == j) {
+ assertTrue(gen > lastGen);
+ lastGen = gen;
+ writer.setRAMBufferSizeMB(0.000001);
+ } else if (j < 20) {
+ assertTrue(gen > lastGen);
+ lastGen = gen;
+ } else if (20 == j) {
+ writer.setRAMBufferSizeMB(16);
+ lastGen = gen;
+ } else if (j < 30) {
+ assertEquals(gen, lastGen);
+ } else if (30 == j) {
+ writer.setRAMBufferSizeMB(0.000001);
+ } else if (j < 40) {
+ assertTrue(gen> lastGen);
+ lastGen = gen;
+ } else if (40 == j) {
+ writer.setMaxBufferedDocs(10);
+ lastGen = gen;
+ } else if (j < 50) {
+ assertEquals(gen, lastGen);
+ writer.setMaxBufferedDocs(10);
+ } else if (50 == j) {
+ assertTrue(gen > lastGen);
+ }
+ }
+ writer.close();
+ dir.close();
+ }
+
+ public void testDiverseDocs() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ // writer.setInfoStream(System.out);
+ long t0 = System.currentTimeMillis();
+ writer.setRAMBufferSizeMB(0.5);
+ Random rand = new Random(31415);
+ for(int i=0;i<3;i++) {
+ // First, docs where every term is unique (heavy on
+ // Posting instances)
+ for(int j=0;j<100;j++) {
+ Document doc = new Document();
+ for(int k=0;k<100;k++) {
+ doc.add(new Field("field", Integer.toString(rand.nextInt()), Field.Store.YES, Field.Index.TOKENIZED));
+ }
+ writer.addDocument(doc);
+ }
+
+ // Next, many single term docs where only one term
+ // occurs (heavy on byte blocks)
+ for(int j=0;j<100;j++) {
+ Document doc = new Document();
+ doc.add(new Field("field", "aaa aaa aaa aaa aaa aaa aaa aaa aaa aaa", Field.Store.YES, Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ }
+
+ // Next, many single term docs where only one term
+ // occurs but the terms are very long (heavy on
+ // char[] arrays)
+ for(int j=0;j<100;j++) {
+ StringBuffer b = new StringBuffer();
+ String x = Integer.toString(j) + ".";
+ for(int k=0;k<1000;k++)
+ b.append(x);
+ String longTerm = b.toString();
+
+ Document doc = new Document();
+ doc.add(new Field("field", longTerm, Field.Store.YES, Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ }
+ }
+ writer.close();
+
+ long t1 = System.currentTimeMillis();
+ IndexSearcher searcher = new IndexSearcher(dir);
+ Hits hits = searcher.search(new TermQuery(new Term("field", "aaa")));
+ assertEquals(300, hits.length());
+ searcher.close();
+
+ dir.close();
+ }
+
+ public void testEnablingNorms() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
+ // Enable norms for only 1 doc, pre flush
+ for(int j=0;j<10;j++) {
+ Document doc = new Document();
+ Field f = new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED);
+ if (j != 8) {
+ f.setOmitNorms(true);
+ }
+ doc.add(f);
+ writer.addDocument(doc);
+ }
+ writer.close();
+
+ Term searchTerm = new Term("field", "aaa");
+
+ IndexSearcher searcher = new IndexSearcher(dir);
+ Hits hits = searcher.search(new TermQuery(searchTerm));
+ assertEquals(10, hits.length());
+ searcher.close();
+
+ writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
+ // Enable norms for only 1 doc, post flush
+ for(int j=0;j<27;j++) {
+ Document doc = new Document();
+ Field f = new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED);
+ if (j != 26) {
+ f.setOmitNorms(true);
+ }
+ doc.add(f);
+ writer.addDocument(doc);
+ }
+ writer.close();
+ searcher = new IndexSearcher(dir);
+ hits = searcher.search(new TermQuery(searchTerm));
+ assertEquals(27, hits.length());
+ searcher.close();
+
+ IndexReader reader = IndexReader.open(dir);
+ reader.close();
+
+ dir.close();
+ }
+
+ public void testHighFreqTerm() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setRAMBufferSizeMB(0.01);
+ writer.setMaxFieldLength(100000000);
+ // Massive doc that has 128 K a's
+ StringBuffer b = new StringBuffer(1024*1024);
+ for(int i=0;i<4096;i++) {
+ b.append(" a a a a a a a a");
+ b.append(" a a a a a a a a");
+ b.append(" a a a a a a a a");
+ b.append(" a a a a a a a a");
+ }
+ Document doc = new Document();
+ doc.add(new Field("field", b.toString(), Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ writer.addDocument(doc);
+ writer.close();
+
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(1, reader.maxDoc());
+ assertEquals(1, reader.numDocs());
+ Term t = new Term("field", "a");
+ assertEquals(1, reader.docFreq(t));
+ TermDocs td = reader.termDocs(t);
+ td.next();
+ assertEquals(128*1024, td.freq());
+ reader.close();
+ dir.close();
+ }
+
// Make sure that a Directory implementation that does
// not use LockFactory at all (ie overrides makeLock and
// implements its own private locking) works OK. This
Index: src/test/org/apache/lucene/index/TestStressIndexing.java
===================================================================
--- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy)
@@ -74,8 +74,6 @@
count++;
}
- modifier.close();
-
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
@@ -125,6 +123,9 @@
IndexerThread indexerThread = new IndexerThread(modifier);
indexerThread.start();
+ IndexerThread indexerThread2 = new IndexerThread(modifier);
+ indexerThread2.start();
+
// Two searchers that constantly just re-instantiate the searcher:
SearcherThread searcherThread1 = new SearcherThread(directory);
searcherThread1.start();
@@ -133,9 +134,14 @@
searcherThread2.start();
indexerThread.join();
+ indexerThread2.join();
searcherThread1.join();
searcherThread2.join();
+
+ modifier.close();
+
assertTrue("hit unexpected exception in indexer", !indexerThread.failed);
+ assertTrue("hit unexpected exception in indexer 2", !indexerThread2.failed);
assertTrue("hit unexpected exception in search1", !searcherThread1.failed);
assertTrue("hit unexpected exception in search2", !searcherThread2.failed);
//System.out.println(" Writer: " + indexerThread.count + " iterations");
Index: src/test/org/apache/lucene/index/TestIndexFileDeleter.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexFileDeleter.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestIndexFileDeleter.java (working copy)
@@ -51,6 +51,7 @@
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMaxBufferedDocs(10);
int i;
for(i=0;i<35;i++) {
addDoc(writer, i);
Index: src/test/org/apache/lucene/index/TestPayloads.java
===================================================================
--- src/test/org/apache/lucene/index/TestPayloads.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestPayloads.java (working copy)
@@ -467,7 +467,8 @@
d.add(new Field(field, new PoolingPayloadTokenStream(pool)));
writer.addDocument(d);
}
- } catch (IOException e) {
+ } catch (Exception e) {
+ e.printStackTrace();
fail(e.toString());
}
}
@@ -482,6 +483,7 @@
}
writer.close();
+
IndexReader reader = IndexReader.open(dir);
TermEnum terms = reader.terms();
while (terms.next()) {
Index: src/test/org/apache/lucene/index/TestDeletionPolicy.java
===================================================================
--- src/test/org/apache/lucene/index/TestDeletionPolicy.java (revision 547668)
+++ src/test/org/apache/lucene/index/TestDeletionPolicy.java (working copy)
@@ -256,6 +256,7 @@
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy);
+ writer.setMaxBufferedDocs(10);
writer.setUseCompoundFile(useCompoundFile);
for(int i=0;i<107;i++) {
addDoc(writer);
@@ -318,6 +319,7 @@
Directory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true, policy);
+ writer.setMaxBufferedDocs(10);
writer.setUseCompoundFile(useCompoundFile);
for(int i=0;i<107;i++) {
addDoc(writer);
@@ -365,6 +367,7 @@
for(int j=0;j= size + this.docStoreOffset;
}
/**
@@ -100,7 +121,8 @@
}
final Document doc(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
- indexStream.seek(n * 8L);
+
+ indexStream.seek((n + docStoreOffset) * 8L);
long position = indexStream.readLong();
fieldsStream.seek(position);
Index: src/java/org/apache/lucene/index/IndexReader.java
===================================================================
--- src/java/org/apache/lucene/index/IndexReader.java (revision 547668)
+++ src/java/org/apache/lucene/index/IndexReader.java (working copy)
@@ -783,7 +783,7 @@
// KeepOnlyLastCommitDeleter:
IndexFileDeleter deleter = new IndexFileDeleter(directory,
deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy,
- segmentInfos, null);
+ segmentInfos, null, null);
// Checkpoint the state we are about to change, in
// case we have to roll back:
Index: src/java/org/apache/lucene/index/IndexFileNames.java
===================================================================
--- src/java/org/apache/lucene/index/IndexFileNames.java (revision 547668)
+++ src/java/org/apache/lucene/index/IndexFileNames.java (working copy)
@@ -38,18 +38,54 @@
/** Extension of norms file */
static final String NORMS_EXTENSION = "nrm";
+ /** Extension of freq postings file */
+ static final String FREQ_EXTENSION = "frq";
+
+ /** Extension of prox postings file */
+ static final String PROX_EXTENSION = "prx";
+
+ /** Extension of terms file */
+ static final String TERMS_EXTENSION = "tis";
+
+ /** Extension of terms index file */
+ static final String TERMS_INDEX_EXTENSION = "tii";
+
+ /** Extension of stored fields index file */
+ static final String FIELDS_INDEX_EXTENSION = "fdx";
+
+ /** Extension of stored fields file */
+ static final String FIELDS_EXTENSION = "fdt";
+
+ /** Extension of vectors fields file */
+ static final String VECTORS_FIELDS_EXTENSION = "tvf";
+
+ /** Extension of vectors documents file */
+ static final String VECTORS_DOCUMENTS_EXTENSION = "tvd";
+
+ /** Extension of vectors index file */
+ static final String VECTORS_INDEX_EXTENSION = "tvx";
+
/** Extension of compound file */
static final String COMPOUND_FILE_EXTENSION = "cfs";
+ /** Extension of compound file for doc store files*/
+ static final String COMPOUND_FILE_STORE_EXTENSION = "cfx";
+
/** Extension of deletes */
static final String DELETES_EXTENSION = "del";
+ /** Extension of field infos */
+ static final String FIELD_INFOS_EXTENSION = "fnm";
+
/** Extension of plain norms */
static final String PLAIN_NORMS_EXTENSION = "f";
/** Extension of separate norms */
static final String SEPARATE_NORMS_EXTENSION = "s";
+ /** Extension of gen file */
+ static final String GEN_EXTENSION = "gen";
+
/**
* This array contains all filename extensions used by
* Lucene's index files, with two exceptions, namely the
@@ -59,25 +95,72 @@
* filename extension.
*/
static final String INDEX_EXTENSIONS[] = new String[] {
- "cfs", "fnm", "fdx", "fdt", "tii", "tis", "frq", "prx", "del",
- "tvx", "tvd", "tvf", "gen", "nrm"
+ COMPOUND_FILE_EXTENSION,
+ FIELD_INFOS_EXTENSION,
+ FIELDS_INDEX_EXTENSION,
+ FIELDS_EXTENSION,
+ TERMS_INDEX_EXTENSION,
+ TERMS_EXTENSION,
+ FREQ_EXTENSION,
+ PROX_EXTENSION,
+ DELETES_EXTENSION,
+ VECTORS_INDEX_EXTENSION,
+ VECTORS_DOCUMENTS_EXTENSION,
+ VECTORS_FIELDS_EXTENSION,
+ GEN_EXTENSION,
+ NORMS_EXTENSION,
+ COMPOUND_FILE_STORE_EXTENSION,
};
/** File extensions that are added to a compound file
* (same as above, minus "del", "gen", "cfs"). */
static final String[] INDEX_EXTENSIONS_IN_COMPOUND_FILE = new String[] {
- "fnm", "fdx", "fdt", "tii", "tis", "frq", "prx",
- "tvx", "tvd", "tvf", "nrm"
+ FIELD_INFOS_EXTENSION,
+ FIELDS_INDEX_EXTENSION,
+ FIELDS_EXTENSION,
+ TERMS_INDEX_EXTENSION,
+ TERMS_EXTENSION,
+ FREQ_EXTENSION,
+ PROX_EXTENSION,
+ VECTORS_INDEX_EXTENSION,
+ VECTORS_DOCUMENTS_EXTENSION,
+ VECTORS_FIELDS_EXTENSION,
+ NORMS_EXTENSION
};
+
+ static final String[] STORE_INDEX_EXTENSIONS = new String[] {
+ VECTORS_INDEX_EXTENSION,
+ VECTORS_FIELDS_EXTENSION,
+ VECTORS_DOCUMENTS_EXTENSION,
+ FIELDS_INDEX_EXTENSION,
+ FIELDS_EXTENSION
+ };
+
+ static final String[] NON_STORE_INDEX_EXTENSIONS = new String[] {
+ FIELD_INFOS_EXTENSION,
+ FREQ_EXTENSION,
+ PROX_EXTENSION,
+ TERMS_EXTENSION,
+ TERMS_INDEX_EXTENSION,
+ NORMS_EXTENSION
+ };
/** File extensions of old-style index files */
static final String COMPOUND_EXTENSIONS[] = new String[] {
- "fnm", "frq", "prx", "fdx", "fdt", "tii", "tis"
+ FIELD_INFOS_EXTENSION,
+ FREQ_EXTENSION,
+ PROX_EXTENSION,
+ FIELDS_INDEX_EXTENSION,
+ FIELDS_EXTENSION,
+ TERMS_INDEX_EXTENSION,
+ TERMS_EXTENSION
};
/** File extensions for term vector support */
static final String VECTOR_EXTENSIONS[] = new String[] {
- "tvx", "tvd", "tvf"
+ VECTORS_INDEX_EXTENSION,
+ VECTORS_DOCUMENTS_EXTENSION,
+ VECTORS_FIELDS_EXTENSION
};
/**
Index: src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 0)
+++ src/java/org/apache/lucene/index/DocumentsWriter.java (revision 0)
@@ -0,0 +1,2841 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RAMOutputStream;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.text.NumberFormat;
+import java.util.Collections;
+
+/**
+ * This class accepts multiple added documents and directly
+ * writes a single segment file. It does this more
+ * efficiently than creating a single segment per document
+ * (with DocumentWriter) and doing standard merges on those
+ * segments.
+ *
+ * When a document is added, its stored fields (if any) and
+ * term vectors (if any) are immediately written to the
+ * Directory (ie these do not consume RAM). The freq/prox
+ * postings are accumulated into a Postings hash table keyed
+ * by term. Each entry in this hash table holds a separate
+ * byte stream (allocated as incrementally growing slices
+ * into large shared byte[] arrays) for freq and prox, that
+ * contains the postings data for multiple documents. If
+ * vectors are enabled, each unique term for each document
+ * also allocates a PostingVector instance to similarly
+ * track the offsets & positions byte stream.
+ *
+ * Once the Postings hash is full (ie is consuming the
+ * allowed RAM) or the number of added docs is large enough
+ * (in the case we are flushing by doc count instead of RAM
+ * usage), we create a real segment and flush it to disk and
+ * reset the Postings hash.
+ *
+ * In adding a document we first organize all of its fields
+ * by field name. We then process field by field, and
+ * record the Posting hash per-field. After each field we
+ * flush its term vectors. When it's time to flush the full
+ * segment we first sort the fields by name, and then go
+ * field by field and sorts its postings.
+ *
+ *
+ * Threads:
+ *
+ * Multiple threads are allowed into addDocument at once.
+ * There is an initial synchronized call to getThreadState
+ * which allocates a ThreadState for this thread. The same
+ * thread will get the same ThreadState over time (thread
+ * affinity) so that if there are consistent patterns (for
+ * example each thread is indexing a different content
+ * source) then we make better use of RAM. Then
+ * processDocument is called on that ThreadState without
+ * synchronization (most of the "heavy lifting" is in this
+ * call). Finally the synchronized "finishDocument" is
+ * called to flush changes to the directory.
+ *
+ * Each ThreadState instance has its own Posting hash. Once
+ * we're using too much RAM, we flush all Posting hashes to
+ * a segment by merging the docIDs in the posting lists for
+ * the same term across multiple thread states (see
+ * writeSegment and appendPostings).
+ *
+ * When flush is called by IndexWriter, or, we flush
+ * internally when autoCommit=false, we forcefully idle all
+ * threads and flush only once they are all idle. This
+ * means you can call flush with a given thread even while
+ * other threads are actively adding/deleting documents.
+ */
+
+final class DocumentsWriter {
+
+ private IndexWriter writer;
+ private Directory directory;
+
+ private FieldInfos fieldInfos = new FieldInfos(); // All fields we've seen
+ private IndexOutput tvx, tvf, tvd; // To write term vectors
+ private FieldsWriter fieldsWriter; // To write stored fields
+
+ private String segment; // Current segment we are working on
+ private String docStoreSegment;
+ private int docStoreOffset;
+
+ private int nextDocID; // Next docID to be added
+ private int numDocsInRAM; // # docs buffered in RAM
+ private int nextWriteDocID; // Next docID to be written
+
+ // Max # ThreadState instances; if there are more threads
+ // than this they share ThreadStates
+ private final static int MAX_THREAD_STATE = 5;
+ private ThreadState[] threadStates = new ThreadState[0];
+ private final HashMap threadBindings = new HashMap();
+ private int numWaiting;
+ private ThreadState[] waitingThreadStates = new ThreadState[1];
+ private int pauseThreads; // True when we need all threads to
+ // pause (eg to flush)
+ private boolean flushPending; // True a thread has decided to flush
+ private boolean postingsIsFull; // True when it's time to write segment
+
+ private PrintStream infoStream;
+
+ final private static long DEFAULT_RAM_BUFFER_SIZE = 16*1024*1024; // Default allowed RAM usage before flushing
+
+ private long ramBufferSize = DEFAULT_RAM_BUFFER_SIZE; // How much RAM we can use before flushing
+ private int maxBufferedDocs; // Non-zero if we are flushing by doc count instead
+
+ private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush
+
+ DocumentsWriter(Directory directory, IndexWriter writer) throws IOException {
+ this.directory = directory;
+ this.writer = writer;
+
+ postingsFreeList = new Posting[0];
+ }
+
+ /** If non-null, various details of indexing are printed
+ * here. */
+ void setInfoStream(PrintStream infoStream) {
+ this.infoStream = infoStream;
+ }
+
+ /** Set how much RAM we can use before flushing. */
+ void setRAMBufferSizeMB(double mb) {
+ ramBufferSize = (long) (mb*1024*1024);
+ }
+
+ double getRAMBufferSizeMB() {
+ return ramBufferSize/1024./1024.;
+ }
+
+ /** Set max buffered docs, which means we will flush by
+ * doc count instead of by RAM usage. */
+ void setMaxBufferedDocs(int count) {
+ maxBufferedDocs = count;
+ ramBufferSize = 0;
+ }
+
+ int getMaxBufferedDocs() {
+ return maxBufferedDocs;
+ }
+
+ /** Get current segment name we are writing. */
+ String getSegment() {
+ return segment;
+ }
+
+ /** Returns how many docs are currently buffered in RAM. */
+ int getNumDocsInRAM() {
+ return numDocsInRAM;
+ }
+
+ /** Returns the current doc store segment we are writing
+ * to. This will be the same as segment when autoCommit
+ * * is true. */
+ String getDocStoreSegment() {
+ return docStoreSegment;
+ }
+
+ /** Returns the doc offset into the shared doc store for
+ * the current buffered docs. */
+ int getDocStoreOffset() {
+ return docStoreOffset;
+ }
+
+ /** Closes the current open doc stores an returns the doc
+ * store segment name. This returns null if there are *
+ * no buffered documents. */
+ String closeDocStore() throws IOException {
+
+ assert allThreadsIdle();
+
+ List flushedFiles = files();
+
+ if (infoStream != null)
+ infoStream.println("\ncloseDocStore: " + flushedFiles.size() + " files to flush to segment " + docStoreSegment);
+
+ if (flushedFiles.size() > 0) {
+ files = null;
+
+ if (tvx != null) {
+ // At least one doc in this run had term vectors enabled
+ assert docStoreSegment != null;
+ tvx.close();
+ tvf.close();
+ tvd.close();
+ tvx = null;
+ }
+
+ if (fieldsWriter != null) {
+ assert docStoreSegment != null;
+ fieldsWriter.close();
+ fieldsWriter = null;
+ }
+
+ String s = docStoreSegment;
+ docStoreSegment = null;
+ docStoreOffset = 0;
+ return s;
+ } else {
+ return null;
+ }
+ }
+
+ private List files = null; // Cached list of files we've created
+
+ /* Returns list of files in use by this instance,
+ * including any flushed segments. */
+ List files() {
+
+ if (files != null)
+ return files;
+
+ files = new ArrayList();
+
+ // Stored fields:
+ if (fieldsWriter != null) {
+ assert docStoreSegment != null;
+ files.add(docStoreSegment + "." + IndexFileNames.FIELDS_EXTENSION);
+ files.add(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION);
+ }
+
+ // Vectors:
+ if (tvx != null) {
+ assert docStoreSegment != null;
+ files.add(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
+ files.add(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
+ files.add(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
+ }
+
+ return files;
+ }
+
+ /** Called if we hit an exception when adding docs,
+ * flushing, etc. This resets our state, discarding any
+ * * docs added since last flush. */
+ void abort() throws IOException {
+
+ // Forcefully remove waiting ThreadStates from line
+ for(int i=0;i= 0;
+ if (0 == pauseThreads)
+ notifyAll();
+ }
+
+ private boolean allThreadsIdle() {
+ for(int i=0;i 0;
+
+ if (infoStream != null)
+ infoStream.println("\nflush postings as segment " + segment + " numDocs=" + numDocsInRAM);
+
+ boolean success = false;
+
+ try {
+
+ fieldInfos.write(directory, segment + ".fnm");
+
+ docCount = numDocsInRAM;
+
+ newFiles.addAll(writeSegment());
+
+ success = true;
+
+ } finally {
+ if (!success)
+ abort();
+ }
+
+ return docCount;
+ }
+
+ /** Build compound file for the segment we just flushed */
+ void createCompoundFile(String segment) throws IOException
+ {
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);
+ final int size = newFiles.size();
+ for(int i=0;i 0)
+ fp.resetPostingArrays();
+ }
+ }
+
+ /** Move all per-document state that was accumulated in
+ * the ThreadState into the "real" stores. */
+ public void writeDocument() throws IOException {
+
+ // Append stored fields to the real FieldsWriter:
+ fieldsWriter.flushDocument(fdtLocal);
+ fdtLocal.reset();
+
+ // Append term vectors to the real outputs:
+ if (tvx != null) {
+ tvx.writeLong(tvd.getFilePointer());
+ if (numVectorFields > 0) {
+ tvd.writeVInt(numVectorFields);
+ for(int i=0;i= hi)
+ return;
+
+ int mid = (lo + hi) / 2;
+
+ if (comparePostings(postings[lo], postings[mid]) > 0) {
+ Posting tmp = postings[lo];
+ postings[lo] = postings[mid];
+ postings[mid] = tmp;
+ }
+
+ if (comparePostings(postings[mid], postings[hi]) > 0) {
+ Posting tmp = postings[mid];
+ postings[mid] = postings[hi];
+ postings[hi] = tmp;
+
+ if (comparePostings(postings[lo], postings[mid]) > 0) {
+ Posting tmp2 = postings[lo];
+ postings[lo] = postings[mid];
+ postings[mid] = tmp2;
+ }
+ }
+
+ int left = lo + 1;
+ int right = hi - 1;
+
+ if (left >= right)
+ return;
+
+ Posting partition = postings[mid];
+
+ for (; ;) {
+ while (comparePostings(postings[right], partition) > 0)
+ --right;
+
+ while (left < right && comparePostings(postings[left], partition) <= 0)
+ ++left;
+
+ if (left < right) {
+ Posting tmp = postings[left];
+ postings[left] = postings[right];
+ postings[right] = tmp;
+ --right;
+ } else {
+ break;
+ }
+ }
+
+ quickSort(postings, lo, left);
+ quickSort(postings, left + 1, hi);
+ }
+
+ /** Do in-place sort of PostingVector array */
+ final void doVectorSort(PostingVector[] postings, int numPosting) {
+ quickSort(postings, 0, numPosting-1);
+ }
+
+ final void quickSort(PostingVector[] postings, int lo, int hi) {
+ if (lo >= hi)
+ return;
+
+ int mid = (lo + hi) / 2;
+
+ if (comparePostings(postings[lo].p, postings[mid].p) > 0) {
+ PostingVector tmp = postings[lo];
+ postings[lo] = postings[mid];
+ postings[mid] = tmp;
+ }
+
+ if (comparePostings(postings[mid].p, postings[hi].p) > 0) {
+ PostingVector tmp = postings[mid];
+ postings[mid] = postings[hi];
+ postings[hi] = tmp;
+
+ if (comparePostings(postings[lo].p, postings[mid].p) > 0) {
+ PostingVector tmp2 = postings[lo];
+ postings[lo] = postings[mid];
+ postings[mid] = tmp2;
+ }
+ }
+
+ int left = lo + 1;
+ int right = hi - 1;
+
+ if (left >= right)
+ return;
+
+ PostingVector partition = postings[mid];
+
+ for (; ;) {
+ while (comparePostings(postings[right].p, partition.p) > 0)
+ --right;
+
+ while (left < right && comparePostings(postings[left].p, partition.p) <= 0)
+ ++left;
+
+ if (left < right) {
+ PostingVector tmp = postings[left];
+ postings[left] = postings[right];
+ postings[right] = tmp;
+ --right;
+ } else {
+ break;
+ }
+ }
+
+ quickSort(postings, lo, left);
+ quickSort(postings, left + 1, hi);
+ }
+
+ /** If there are fields we've seen but did not see again
+ * in the last run, then free them up. Also reduce *
+ * postings hash size. */
+ void trimFields() {
+
+ int upto = 0;
+ for(int i=0;i0)
+ position += analyzer.getPositionIncrementGap(fieldInfo.name);
+
+ if (!field.isTokenized()) { // un-tokenized field
+ token = localToken;
+ String stringValue = field.stringValue();
+ token.setTermText(stringValue);
+ token.setStartOffset(offset);
+ token.setEndOffset(offset + stringValue.length());
+ addPosition();
+ offset += stringValue.length();
+ length++;
+ } else { // tokenized field
+ final TokenStream stream;
+ final TokenStream streamValue = field.tokenStreamValue();
+
+ if (streamValue != null)
+ stream = streamValue;
+ else {
+ // the field does not have a TokenStream,
+ // so we have to obtain one from the analyzer
+ final Reader reader; // find or make Reader
+ final Reader readerValue = field.readerValue();
+
+ if (readerValue != null)
+ reader = readerValue;
+ else {
+ String stringValue = field.stringValue();
+ if (stringValue == null)
+ throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
+ stringReader.init(stringValue);
+ reader = stringReader;
+ }
+
+ // Tokenize field and add to postingTable
+ stream = analyzer.tokenStream(fieldInfo.name, reader);
+ }
+
+ // reset the TokenStream to the first token
+ stream.reset();
+
+ try {
+ offsetEnd = offset-1;
+ for (token = stream.next(); token != null; token = stream.next()) {
+ position += (token.getPositionIncrement() - 1);
+ addPosition();
+ if (++length >= maxFieldLength) {
+ if (infoStream != null)
+ infoStream.println("maxFieldLength " +maxFieldLength+ " reached for field " + fieldInfo.name + ", ignoring following tokens");
+ break;
+ }
+ }
+ offset = offsetEnd+1;
+ } finally {
+ stream.close();
+ }
+ }
+
+ boost *= field.getBoost();
+ }
+
+ /** Only called when term vectors are enabled. This
+ * is called the first time we see a given term for
+ * each * document, to allocate a PostingVector
+ * instance that * is used to record data needed to
+ * write the posting * vectors. */
+ private PostingVector addNewVector() {
+
+ if (postingsVectorsUpto == postingsVectors.length) {
+ final int newSize;
+ if (postingsVectors.length < 2)
+ newSize = 2;
+ else
+ newSize = (int) (1.5*postingsVectors.length);
+ PostingVector[] newArray = new PostingVector[newSize];
+ System.arraycopy(postingsVectors, 0, newArray, 0, postingsVectors.length);
+ postingsVectors = newArray;
+ }
+
+ p.vector = postingsVectors[postingsVectorsUpto];
+ if (p.vector == null)
+ p.vector = postingsVectors[postingsVectorsUpto] = new PostingVector();
+
+ postingsVectorsUpto++;
+
+ final PostingVector v = p.vector;
+ v.p = p;
+
+ final int firstSize = levelSizeArray[0];
+
+ if (doVectorPositions) {
+ final int upto = vectorsPool.newSlice(firstSize);
+ v.posStart = v.posUpto = vectorsPool.byteOffset + upto;
+ }
+
+ if (doVectorOffsets) {
+ final int upto = vectorsPool.newSlice(firstSize);
+ v.offsetStart = v.offsetUpto = vectorsPool.byteOffset + upto;
+ }
+
+ return v;
+ }
+
+ int offsetStartCode;
+ int offsetStart;
+
+ /** This is the hotspot of indexing: it's called once
+ * for every term of every document. Its job is to *
+ * update the postings byte stream (Postings hash) *
+ * based on the occurence of a single term. */
+ private void addPosition() {
+
+ final Payload payload = token.getPayload();
+
+ final String tokenString;
+ final int tokenTextLen;
+ final int tokenTextOffset;
+
+ // Get the text of this term. Term can either
+ // provide a String token or offset into a char[]
+ // array
+ final char[] tokenText = token.termBuffer();
+
+ int code = 0;
+ int code2 = 0;
+
+ if (tokenText == null) {
+
+ // Fallback to String token
+ tokenString = token.termText();
+ tokenTextLen = tokenString.length();
+ tokenTextOffset = 0;
+
+ // Compute hashcode.
+ int downto = tokenTextLen;
+ while (downto > 0)
+ code = (code*31) + tokenString.charAt(--downto);
+
+ // System.out.println(" addPosition: field=" + fieldInfo.name + " string=" + tokenString + " pos=" + position + " offsetStart=" + (offset+token.startOffset()) + " offsetEnd=" + (offset+token.endOffset()) + " docID=" + docID + " doPos=" + doVectorPositions + " doOffset=" + doVectorOffsets);
+
+ } else {
+ tokenString = null;
+ tokenTextLen = token.termBufferLength();
+ tokenTextOffset = token.termBufferOffset();
+
+ // Compute hashcode
+ int downto = tokenTextLen+tokenTextOffset;
+ while (downto > tokenTextOffset)
+ code = (code*31) + tokenText[--downto];
+
+ // System.out.println(" addPosition: buffer=" + new String(tokenText, tokenTextOffset, tokenTextLen) + " pos=" + position + " offsetStart=" + (offset+token.startOffset()) + " offsetEnd=" + (offset + token.endOffset()) + " docID=" + docID + " doPos=" + doVectorPositions + " doOffset=" + doVectorOffsets);
+ }
+
+ int hashPos = code & postingsHashMask;
+
+ // Locate Posting in hash
+ p = postingsHash[hashPos];
+
+ if (p != null && !postingEquals(tokenString, tokenText, tokenTextLen, tokenTextOffset)) {
+ // Conflict: keep searching different locations in
+ // the hash table.
+ final int inc = code*1347|1;
+ do {
+ code += inc;
+ hashPos = code & postingsHashMask;
+ p = postingsHash[hashPos];
+ } while (p != null && !postingEquals(tokenString, tokenText, tokenTextLen, tokenTextOffset));
+ }
+
+ final int proxCode;
+
+ if (p != null) { // term seen since last flush
+
+ if (docID != p.lastDocID) { // term not yet seen in this doc
+
+ // System.out.println(" seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto);
+
+ assert p.docFreq > 0;
+
+ // Now that we know doc freq for previous doc,
+ // write it & lastDocCode
+ freqUpto = p.freqUpto & BYTE_BLOCK_MASK;
+ freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT];
+ if (1 == p.docFreq)
+ writeFreqVInt(p.lastDocCode|1);
+ else {
+ writeFreqVInt(p.lastDocCode);
+ writeFreqVInt(p.docFreq);
+ }
+ p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK);
+
+ if (doVectors) {
+ vector = addNewVector();
+ if (doVectorOffsets) {
+ offsetStartCode = offsetStart = offset + token.startOffset();
+ offsetEnd = offset + token.endOffset();
+ }
+ }
+
+ proxCode = position;
+
+ p.docFreq = 1;
+
+ // Store code so we can write this after we're
+ // done with this new doc
+ p.lastDocCode = (docID-p.lastDocID) << 1;
+ p.lastDocID = docID;
+
+ } else { // term already seen in this doc
+ // System.out.println(" seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto);
+ p.docFreq++;
+
+ proxCode = position-p.lastPosition;
+
+ if (doVectors) {
+ vector = p.vector;
+ if (vector == null)
+ vector = addNewVector();
+ if (doVectorOffsets) {
+ offsetStart = offset + token.startOffset();
+ offsetEnd = offset + token.endOffset();
+ offsetStartCode = offsetStart-vector.lastOffset;
+ }
+ }
+ }
+ } else { // term not seen before
+ // System.out.println(" never seen docID=" + docID);
+
+ // Refill?
+ if (0 == postingsFreeCount) {
+ postingsFreeCount = postingsFreeList.length;
+ getPostings(postingsFreeList);
+ }
+
+ // Pull next free Posting from free list
+ p = postingsFreeList[--postingsFreeCount];
+
+ final int textLen1 = 1+tokenTextLen;
+ if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE)
+ charPool.nextBuffer();
+ final char[] text = charPool.buffer;
+ final int textUpto = charPool.byteUpto;
+ p.textStart = textUpto + charPool.byteOffset;
+ charPool.byteUpto += textLen1;
+
+ if (tokenString == null)
+ System.arraycopy(tokenText, tokenTextOffset, text, textUpto, tokenTextLen);
+ else
+ tokenString.getChars(0, tokenTextLen, text, textUpto);
+
+ text[textUpto+tokenTextLen] = 0xffff;
+
+ assert postingsHash[hashPos] == null;
+
+ postingsHash[hashPos] = p;
+ numPostings++;
+
+ if (numPostings == postingsHashHalfSize)
+ rehashPostings(2*postingsHashSize);
+
+ // Init first slice for freq & prox streams
+ final int firstSize = levelSizeArray[0];
+
+ final int upto1 = postingsPool.newSlice(firstSize);
+ p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1;
+
+ final int upto2 = postingsPool.newSlice(firstSize);
+ p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2;
+
+ p.lastDocCode = docID << 1;
+ p.lastDocID = docID;
+ p.docFreq = 1;
+
+ if (doVectors) {
+ vector = addNewVector();
+ if (doVectorOffsets) {
+ offsetStart = offsetStartCode = offset + token.startOffset();
+ offsetEnd = offset + token.endOffset();
+ }
+ }
+
+ proxCode = position;
+ }
+
+ proxUpto = p.proxUpto & BYTE_BLOCK_MASK;
+ prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT];
+ assert prox != null;
+
+ if (payload != null && payload.length > 0) {
+ writeProxVInt((proxCode<<1)|1);
+ writeProxVInt(payload.length);
+ writeProxBytes(payload.data, payload.offset, payload.length);
+ fieldInfo.storePayloads = true;
+ } else
+ writeProxVInt(proxCode<<1);
+
+ p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK);
+
+ p.lastPosition = position++;
+
+ if (doVectorPositions) {
+ posUpto = vector.posUpto & BYTE_BLOCK_MASK;
+ pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT];
+ writePosVInt(proxCode);
+ vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK);
+ }
+
+ if (doVectorOffsets) {
+ offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK;
+ offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT];
+ writeOffsetVInt(offsetStartCode);
+ writeOffsetVInt(offsetEnd-offsetStart);
+ vector.lastOffset = offsetEnd;
+ vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
+ }
+ }
+
+ /** Called when postings hash is too small (> 50%
+ * occupied) or too large (< 20% occupied). */
+ void rehashPostings(final int newSize) {
+
+ postingsHashMask = newSize-1;
+
+ Posting[] newHash = new Posting[newSize];
+ for(int i=0;i> CHAR_BLOCK_SHIFT];
+ int pos = start;
+ while(text[pos] != 0xffff)
+ pos++;
+ int code = 0;
+ while (pos > start)
+ code = (code*31) + text[--pos];
+
+ int hashPos = code & postingsHashMask;
+ assert hashPos >= 0;
+ if (newHash[hashPos] != null) {
+ final int inc = code*1347|1;
+ do {
+ code += inc;
+ hashPos = code & postingsHashMask;
+ } while (newHash[hashPos] != null);
+ }
+ newHash[hashPos] = p0;
+ }
+ }
+
+ postingsHash = newHash;
+ postingsHashSize = newSize;
+ postingsHashHalfSize = newSize/2;
+ }
+
+ final ByteSliceReader vectorSliceReader = new ByteSliceReader();
+
+ /** Called once per field per document if term vectors
+ * are enabled, to write the vectors to *
+ * RAMOutputStream, which is then quickly flushed to
+ * * the real term vectors files in the Directory. */
+ void writeVectors(FieldInfo fieldInfo) throws IOException {
+
+ assert fieldInfo.storeTermVector;
+
+ vectorFieldNumbers[numVectorFields] = fieldInfo.number;
+ vectorFieldPointers[numVectorFields] = tvfLocal.getFilePointer();
+ numVectorFields++;
+
+ final int numPostingsVectors = postingsVectorsUpto;
+
+ tvfLocal.writeVInt(numPostingsVectors);
+ byte bits = 0x0;
+ if (doVectorPositions)
+ bits |= TermVectorsWriter.STORE_POSITIONS_WITH_TERMVECTOR;
+ if (doVectorOffsets)
+ bits |= TermVectorsWriter.STORE_OFFSET_WITH_TERMVECTOR;
+ tvfLocal.writeByte(bits);
+
+ doVectorSort(postingsVectors, numPostingsVectors);
+
+ Posting lastPosting = null;
+
+ final ByteSliceReader reader = vectorSliceReader;
+
+ for(int j=0;j> CHAR_BLOCK_SHIFT];
+ final int start2 = posting.textStart & CHAR_BLOCK_MASK;
+ int pos2 = start2;
+
+ // Compute common prefix between last term and
+ // this term
+ if (lastPosting == null)
+ prefix = 0;
+ else {
+ final char[] text1 = charPool.buffers[lastPosting.textStart >> CHAR_BLOCK_SHIFT];
+ final int start1 = lastPosting.textStart & CHAR_BLOCK_MASK;
+ int pos1 = start1;
+ while(true) {
+ final char c1 = text1[pos1];
+ final char c2 = text2[pos2];
+ if (c1 != c2 || c1 == 0xffff) {
+ prefix = pos1-start1;
+ break;
+ }
+ pos1++;
+ pos2++;
+ }
+ }
+ lastPosting = posting;
+
+ // Compute length
+ while(text2[pos2] != 0xffff)
+ pos2++;
+
+ final int suffix = pos2 - start2 - prefix;
+ tvfLocal.writeVInt(prefix);
+ tvfLocal.writeVInt(suffix);
+ tvfLocal.writeChars(text2, start2 + prefix, suffix);
+ tvfLocal.writeVInt(freq);
+
+ if (doVectorPositions) {
+ reader.init(vectorsPool, vector.posStart, vector.posUpto);
+ reader.writeTo(tvfLocal);
+ }
+
+ if (doVectorOffsets) {
+ reader.init(vectorsPool, vector.offsetStart, vector.offsetUpto);
+ reader.writeTo(tvfLocal);
+ }
+ }
+ }
+ }
+ }
+
+ private static final byte defaultNorm = Similarity.encodeNorm(1.0f);
+
+ /** Write norms in the "true" segment format. This is
+ * called only during commit, to create the .nrm file. */
+ void writeNorms(String segmentName, int totalNumDoc) throws IOException {
+
+ IndexOutput normsOut = directory.createOutput(segmentName + "." + IndexFileNames.NORMS_EXTENSION);
+
+ try {
+ normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length);
+
+ final int numField = fieldInfos.size();
+
+ for (int fieldIdx=0;fieldIdx 0)
+ allFields.add(fp);
+ }
+ }
+
+ // Sort by field name
+ Collections.sort(allFields);
+ final int numAllFields = allFields.size();
+
+ skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval,
+ termsOut.maxSkipLevels,
+ numDocsInRAM, freqOut, proxOut);
+
+ int start = 0;
+ while(start < numAllFields) {
+
+ final String fieldName = ((ThreadState.FieldData) allFields.get(start)).fieldInfo.name;
+
+ int end = start+1;
+ while(end < numAllFields && ((ThreadState.FieldData) allFields.get(end)).fieldInfo.name.equals(fieldName))
+ end++;
+
+ ThreadState.FieldData[] fields = new ThreadState.FieldData[end-start];
+ for(int i=start;i 1.5*postingsFreeCount) {
+ int newSize = postingsFreeList.length;
+ while(newSize > 1.25*postingsFreeCount) {
+ newSize = (int) (newSize*0.8);
+ }
+ Posting[] newArray = new Posting[newSize];
+ System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount);
+ postingsFreeList = newArray;
+ }
+
+ return flushedFiles;
+ }
+
+ /** Returns the name of the file with this extension, on
+ * the current segment we are working on. */
+ private String segmentFileName(String extension) {
+ return segment + "." + extension;
+ }
+
+ private final TermInfo termInfo = new TermInfo(); // minimize consing
+
+ /** Used to merge the postings from multiple ThreadStates
+ * when creating a segment */
+ final static class FieldMergeState {
+
+ private ThreadState.FieldData field;
+
+ private Posting[] postings;
+
+ private Posting p;
+ private char[] text;
+ private int textOffset;
+
+ private int postingUpto = -1;
+
+ private ByteSliceReader freq = new ByteSliceReader();
+ private ByteSliceReader prox = new ByteSliceReader();
+
+ private int lastDocID;
+ private int docID;
+ private int termFreq;
+
+ boolean nextTerm() throws IOException {
+ postingUpto++;
+ if (postingUpto == field.numPostings)
+ return false;
+
+ p = postings[postingUpto];
+ docID = 0;
+
+ text = field.threadState.charPool.buffers[p.textStart >> CHAR_BLOCK_SHIFT];
+ textOffset = p.textStart & CHAR_BLOCK_MASK;
+
+ if (p.freqUpto > p.freqStart)
+ freq.init(field.threadState.postingsPool, p.freqStart, p.freqUpto);
+ else
+ freq.bufferOffset = freq.upto = freq.endIndex = 0;
+
+ prox.init(field.threadState.postingsPool, p.proxStart, p.proxUpto);
+
+ // Should always be true
+ boolean result = nextDoc();
+ assert result;
+
+ return true;
+ }
+
+ public boolean nextDoc() throws IOException {
+ if (freq.bufferOffset + freq.upto == freq.endIndex) {
+ if (p.lastDocCode != -1) {
+ // Return last doc
+ docID = p.lastDocID;
+ termFreq = p.docFreq;
+ p.lastDocCode = -1;
+ return true;
+ } else
+ // EOF
+ return false;
+ }
+
+ final int code = freq.readVInt();
+ docID += code >>> 1;
+ if ((code & 1) != 0)
+ termFreq = 1;
+ else
+ termFreq = freq.readVInt();
+
+ return true;
+ }
+ }
+
+ int compareText(final char[] text1, int pos1, final char[] text2, int pos2) {
+ while(true) {
+ final char c1 = text1[pos1++];
+ final char c2 = text2[pos2++];
+ if (c1 < c2)
+ if (0xffff == c2)
+ return 1;
+ else
+ return -1;
+ else if (c2 < c1)
+ if (0xffff == c1)
+ return -1;
+ else
+ return 1;
+ else if (0xffff == c1)
+ return 0;
+ }
+ }
+
+ /* Walk through all unique text tokens (Posting
+ * instances) found in this field and serialize them
+ * into a single RAM segment. */
+ void appendPostings(ThreadState.FieldData[] fields,
+ TermInfosWriter termsOut,
+ IndexOutput freqOut,
+ IndexOutput proxOut)
+ throws CorruptIndexException, IOException {
+
+ final String fieldName = fields[0].fieldInfo.name;
+ int numFields = fields.length;
+
+ final FieldMergeState[] mergeStates = new FieldMergeState[numFields];
+
+ for(int i=0;i 0) {
+
+ // Get the next term to merge
+ termStates[0] = mergeStates[0];
+ int numToMerge = 1;
+
+ for(int i=1;i 0) {
+
+ if ((++df % skipInterval) == 0) {
+ skipListWriter.setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength);
+ skipListWriter.bufferSkip(df);
+ }
+
+ FieldMergeState minState = termStates[0];
+ for(int i=1;i lastDoc || df == 1;
+
+ final int newDocCode = (doc-lastDoc)<<1;
+ lastDoc = doc;
+
+ final ByteSliceReader prox = minState.prox;
+
+ // Carefully copy over the prox + payload info,
+ // changing the format to match Lucene's segment
+ // format.
+ for(int j=0;j 0)
+ copyBytes(prox, proxOut, payloadLength);
+ } else {
+ assert 0 == (code & 1);
+ proxOut.writeVInt(code>>1);
+ }
+ }
+
+ if (1 == termDocFreq) {
+ freqOut.writeVInt(newDocCode|1);
+ } else {
+ freqOut.writeVInt(newDocCode);
+ freqOut.writeVInt(termDocFreq);
+ }
+
+ if (!minState.nextDoc()) {
+
+ // Remove from termStates
+ int upto = 0;
+ for(int i=0;i 0;
+
+ // Done merging this term
+
+ long skipPointer = skipListWriter.writeSkip(freqOut);
+
+ // Write term
+ termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer));
+ termsOut.add(term, termInfo);
+ }
+ }
+
+ /** Returns a free (idle) ThreadState that may be used for
+ * indexing this one document. This call also pauses if a
+ * flush is pending. */
+ synchronized ThreadState getThreadState(Document doc) throws IOException {
+
+ // First, find a thread state. If this thread already
+ // has affinity to a specific ThreadState, use that one
+ // again.
+ ThreadState state = (ThreadState) threadBindings.get(Thread.currentThread());
+ if (state == null) {
+ // First time this thread has called us since last flush
+ ThreadState minThreadState = null;
+ for(int i=0;i 0)
+ System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
+ threadStates = newArray;
+ state = threadStates[threadStates.length-1] = new ThreadState();
+ }
+ threadBindings.put(Thread.currentThread(), state);
+ }
+
+ // Next, wait until my thread state is idle (in case
+ // it's shared with other threads) and for threads to
+ // not be paused nor a flush pending:
+ while(!state.isIdle || pauseThreads != 0 || flushPending)
+ try {
+ wait();
+ } catch (InterruptedException e) {}
+
+ if (segment == null)
+ segment = writer.newSegmentName();
+
+ numDocsInRAM++;
+
+ // We must at this point commit to flushing to ensure we
+ // always get N docs when we flush by doc count, even if
+ // > 1 thread is adding documents:
+ if (!flushPending && maxBufferedDocs > 0 && numDocsInRAM >= maxBufferedDocs) {
+ flushPending = true;
+ state.doFlushAfter = true;
+ } else
+ state.doFlushAfter = false;
+
+ state.isIdle = false;
+
+ boolean success = false;
+ try {
+ state.init(doc, nextDocID++);
+ success = true;
+ } finally {
+ if (!success) {
+ state.isIdle = true;
+ if (state.doFlushAfter) {
+ state.doFlushAfter = false;
+ flushPending = false;
+ }
+ abort();
+ }
+ }
+
+ return state;
+ }
+
+ /** Returns true if the caller (IndexWriter) should now
+ * flush. */
+ boolean addDocument(Document doc, Analyzer analyzer)
+ throws CorruptIndexException, IOException {
+
+ // This call is synchronized but fast
+ final ThreadState state = getThreadState(doc);
+ boolean success = false;
+ try {
+ // This call is not synchronized and does all the work
+ state.processDocument(analyzer);
+ // This call synchronized but fast
+ finishDocument(state);
+ success = true;
+ } finally {
+ if (!success) {
+ state.isIdle = true;
+ abort();
+ }
+ }
+ return state.doFlushAfter;
+ }
+
+ /** Does the synchronized work to finish/flush the
+ * inverted document. */
+ private synchronized void finishDocument(ThreadState state) throws IOException {
+
+ // Now write the indexed document to the real files.
+
+ if (nextWriteDocID == state.docID) {
+ // It's my turn, so write everything now:
+ state.isIdle = true;
+ nextWriteDocID++;
+ state.writeDocument();
+
+ // If any states were waiting on me, sweep through and
+ // flush those that are enabled by my write.
+ if (numWaiting > 0) {
+ while(true) {
+ int upto = 0;
+ for(int i=0;i
+ // IndexOutput
+ while(numBytes > 0) {
+ final int chunk;
+ if (numBytes > 4096)
+ chunk = 4096;
+ else
+ chunk = (int) numBytes;
+ srcIn.readBytes(copyByteBuffer, 0, chunk);
+ destIn.writeBytes(copyByteBuffer, 0, chunk);
+ numBytes -= chunk;
+ }
+ }
+
+ /* Stores norms, buffered in RAM, until they are flushed
+ * to a partial segment. */
+ private static class BufferedNorms {
+
+ RAMOutputStream out;
+ int upto;
+
+ BufferedNorms() {
+ out = new RAMOutputStream();
+ }
+
+ void add(float norm) throws IOException {
+ byte b = Similarity.encodeNorm(norm);
+ out.writeByte(b);
+ upto++;
+ }
+
+ void reset() {
+ out.reset();
+ upto = 0;
+ }
+
+ void fill(int docID) throws IOException {
+ // Must now fill in docs that didn't have this
+ // field. Note that this is how norms can consume
+ // tremendous storage when the docs have widely
+ // varying different fields, because we are not
+ // storing the norms sparsely (see LUCENE-830)
+ if (upto < docID) {
+ fillBytes(out, defaultNorm, docID-upto);
+ upto = docID;
+ }
+ }
+ }
+
+ /* Simple StringReader that can be reset to a new string;
+ * we use this when tokenizing the string value from a
+ * Field. */
+ private final static class ReusableStringReader extends Reader {
+ int upto;
+ int left;
+ String s;
+ void init(String s) {
+ this.s = s;
+ left = s.length();
+ this.upto = 0;
+ }
+ public int read(char[] c) {
+ return read(c, 0, c.length);
+ }
+ public int read(char[] c, int off, int len) {
+ if (left > len) {
+ s.getChars(upto, upto+len, c, off);
+ upto += len;
+ left -= len;
+ return len;
+ } else if (0 == left) {
+ return -1;
+ } else {
+ s.getChars(upto, upto+left, c, off);
+ int r = left;
+ left = 0;
+ upto = s.length();
+ return r;
+ }
+ }
+ public void close() {};
+ }
+
+ /* IndexInput that knows how to read the byte slices written
+ * by Posting and PostingVector. We read the bytes in
+ * each slice until we hit the end of that slice at which
+ * point we read the forwarding address of the next slice
+ * and then jump to it.*/
+ private final static class ByteSliceReader extends IndexInput {
+ ByteBlockPool pool;
+ int bufferUpto;
+ byte[] buffer;
+ public int upto;
+ int limit;
+ int level;
+ public int bufferOffset;
+
+ public int endIndex;
+
+ public void init(ByteBlockPool pool, int startIndex, int endIndex) {
+
+ assert endIndex-startIndex > 0;
+
+ this.pool = pool;
+ this.endIndex = endIndex;
+
+ level = 0;
+ bufferUpto = startIndex / BYTE_BLOCK_SIZE;
+ bufferOffset = bufferUpto * BYTE_BLOCK_SIZE;
+ buffer = pool.buffers[bufferUpto];
+ upto = startIndex & BYTE_BLOCK_MASK;
+
+ final int firstSize = levelSizeArray[0];
+
+ if (startIndex+firstSize >= endIndex) {
+ // There is only this one slice to read
+ limit = endIndex & BYTE_BLOCK_MASK;
+ } else
+ limit = upto+firstSize-4;
+ }
+
+ public byte readByte() {
+ // Assert that we are not @ EOF
+ assert upto + bufferOffset < endIndex;
+ if (upto == limit)
+ nextSlice();
+ return buffer[upto++];
+ }
+
+ public long writeTo(IndexOutput out) throws IOException {
+ long size = 0;
+ while(true) {
+ if (limit + bufferOffset == endIndex) {
+ assert endIndex - bufferOffset >= upto;
+ out.writeBytes(buffer, upto, limit-upto);
+ size += limit-upto;
+ break;
+ } else {
+ out.writeBytes(buffer, upto, limit-upto);
+ size += limit-upto;
+ nextSlice();
+ }
+ }
+
+ return size;
+ }
+
+ public void nextSlice() {
+
+ // Skip to our next slice
+ final int nextIndex = ((buffer[limit]&0xff)<<24) + ((buffer[1+limit]&0xff)<<16) + ((buffer[2+limit]&0xff)<<8) + (buffer[3+limit]&0xff);
+
+ level = nextLevelArray[level];
+ final int newSize = levelSizeArray[level];
+
+ bufferUpto = nextIndex / BYTE_BLOCK_SIZE;
+ bufferOffset = bufferUpto * BYTE_BLOCK_SIZE;
+
+ buffer = pool.buffers[bufferUpto];
+ upto = nextIndex & BYTE_BLOCK_MASK;
+
+ if (nextIndex + newSize >= endIndex) {
+ // We are advancing to the final slice
+ assert endIndex - nextIndex > 0;
+ limit = endIndex - bufferOffset;
+ } else {
+ // This is not the final slice (subtract 4 for the
+ // forwarding address at the end of this new slice)
+ limit = upto+newSize-4;
+ }
+ }
+
+ public void readBytes(byte[] b, int offset, int len) {
+ while(len > 0) {
+ final int numLeft = limit-upto;
+ if (numLeft < len) {
+ // Read entire slice
+ System.arraycopy(buffer, upto, b, offset, numLeft);
+ offset += numLeft;
+ len -= numLeft;
+ nextSlice();
+ } else {
+ // This slice is the last one
+ System.arraycopy(buffer, upto, b, offset, len);
+ upto += len;
+ break;
+ }
+ }
+ }
+
+ public long getFilePointer() {throw new RuntimeException("not implemented");}
+ public long length() {throw new RuntimeException("not implemented");}
+ public void seek(long pos) {throw new RuntimeException("not implemented");}
+ public void close() {throw new RuntimeException("not implemented");}
+ }
+
+ // Size of each slice. These arrays should be at most 16
+ // elements. First array is just a compact way to encode
+ // X+1 with a max. Second array is the length of each
+ // slice, ie first slice is 5 bytes, next slice is 14
+ // bytes, etc.
+ final static int[] nextLevelArray = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
+ final static int[] levelSizeArray = {5, 14, 20, 30, 40, 40, 80, 80, 120, 200};
+
+ /* Class that Posting and PostingVector use to write byte
+ * streams into shared fixed-size byte[] arrays. The idea
+ * is to allocate slices of increasing lengths For
+ * example, the first slice is 5 bytes, the next slice is
+ * 14, etc. We start by writing our bytes into the first
+ * 5 bytes. When we hit the end of the slice, we allocate
+ * the next slice and then write the address of the new
+ * slice into the last 4 bytes of the previous slice (the
+ * "forwarding address").
+ *
+ * Each slice is filled with 0's initially, and we mark
+ * the end with a non-zero byte. This way the methods
+ * that are writing into the slice don't need to record
+ * its length and instead allocate a new slice once they
+ * hit a non-zero byte. */
+ private final class ByteBlockPool {
+
+ public byte[][] buffers = new byte[10][];
+
+ int bufferUpto = -1; // Which buffer we are upto
+ public int byteUpto = BYTE_BLOCK_SIZE; // Where we are in head buffer
+
+ public byte[] buffer; // Current head buffer
+ public int byteOffset = -BYTE_BLOCK_SIZE; // Current head offset
+
+ public void reset() {
+ recycleByteBlocks(buffers, 1+bufferUpto);
+ bufferUpto = -1;
+ byteUpto = BYTE_BLOCK_SIZE;
+ byteOffset = -BYTE_BLOCK_SIZE;
+ }
+
+ public void nextBuffer() {
+ bufferUpto++;
+ if (bufferUpto == buffers.length) {
+ byte[][] newBuffers = new byte[(int) (bufferUpto*1.5)][];
+ System.arraycopy(buffers, 0, newBuffers, 0, bufferUpto);
+ buffers = newBuffers;
+ }
+ buffer = buffers[bufferUpto] = getByteBlock();
+ Arrays.fill(buffer, (byte) 0);
+
+ byteUpto = 0;
+ byteOffset += BYTE_BLOCK_SIZE;
+ }
+
+ public int newSlice(final int size) {
+ if (byteUpto > BYTE_BLOCK_SIZE-size)
+ nextBuffer();
+ final int upto = byteUpto;
+ byteUpto += size;
+ buffer[byteUpto-1] = 16;
+ return upto;
+ }
+
+ public int allocSlice(final byte[] slice, final int upto) {
+
+ final int level = slice[upto] & 15;
+ final int newLevel = nextLevelArray[level];
+ final int newSize = levelSizeArray[newLevel];
+
+ // Maybe allocate another block
+ if (byteUpto > BYTE_BLOCK_SIZE-newSize)
+ nextBuffer();
+
+ final int newUpto = byteUpto;
+ final int offset = newUpto + byteOffset;
+ byteUpto += newSize;
+
+ // Copy forward the past 3 bytes (which we are about
+ // to overwrite with the forwarding address):
+ buffer[newUpto] = slice[upto-3];
+ buffer[newUpto+1] = slice[upto-2];
+ buffer[newUpto+2] = slice[upto-1];
+
+ // Write forwarding address at end of last slice:
+ slice[upto-3] = (byte) (offset >>> 24);
+ slice[upto-2] = (byte) (offset >>> 16);
+ slice[upto-1] = (byte) (offset >>> 8);
+ slice[upto] = (byte) offset;
+
+ // Write new level:
+ buffer[byteUpto-1] = (byte) (16|newLevel);
+
+ return newUpto+3;
+ }
+ }
+
+ private final class CharBlockPool {
+
+ public char[][] buffers = new char[10][];
+ int numBuffer;
+
+ int bufferUpto = -1; // Which buffer we are upto
+ public int byteUpto = CHAR_BLOCK_SIZE; // Where we are in head buffer
+
+ public char[] buffer; // Current head buffer
+ public int byteOffset = -CHAR_BLOCK_SIZE; // Current head offset
+
+ public void reset() {
+ recycleCharBlocks(buffers, 1+bufferUpto);
+ bufferUpto = -1;
+ byteUpto = CHAR_BLOCK_SIZE;
+ byteOffset = -CHAR_BLOCK_SIZE;
+ }
+
+ public void nextBuffer() {
+ bufferUpto++;
+ if (bufferUpto == buffers.length) {
+ char[][] newBuffers = new char[(int) (bufferUpto*1.5)][];
+ System.arraycopy(buffers, 0, newBuffers, 0, bufferUpto);
+ buffers = newBuffers;
+ }
+ buffer = buffers[bufferUpto] = getCharBlock();
+
+ byteUpto = 0;
+ byteOffset += CHAR_BLOCK_SIZE;
+ }
+ }
+
+ // Used only when infoStream != null
+ private long segmentSize(String segmentName) throws IOException {
+ assert infoStream != null;
+
+ long size = directory.fileLength(segmentName + ".tii") +
+ directory.fileLength(segmentName + ".tis") +
+ directory.fileLength(segmentName + ".frq") +
+ directory.fileLength(segmentName + ".prx");
+
+ final String normFileName = segmentName + ".nrm";
+ if (directory.fileExists(normFileName))
+ size += directory.fileLength(normFileName);
+
+ return size;
+ }
+
+ final private static int POINTER_NUM_BYTE = 4;
+ final private static int INT_NUM_BYTE = 4;
+ final private static int CHAR_NUM_BYTE = 2;
+ final private static int OBJECT_HEADER_NUM_BYTE = 8;
+
+ final static int POSTING_NUM_BYTE = OBJECT_HEADER_NUM_BYTE + 9*INT_NUM_BYTE + POINTER_NUM_BYTE;
+
+ // Holds free pool of Posting instances
+ private Posting[] postingsFreeList;
+ private int postingsFreeCount;
+
+ /* Allocate more Postings from shared pool */
+ private synchronized void getPostings(Posting[] postings) {
+ numBytesUsed += postings.length * POSTING_NUM_BYTE;
+ final int numToCopy;
+ if (postingsFreeCount < postings.length)
+ numToCopy = postingsFreeCount;
+ else
+ numToCopy = postings.length;
+ final int start = postingsFreeCount-numToCopy;
+ System.arraycopy(postingsFreeList, start,
+ postings, 0, numToCopy);
+ postingsFreeCount -= numToCopy;
+
+ // Directly allocate the remainder if any
+ if (numToCopy < postings.length) {
+ numBytesAlloc += (postings.length - numToCopy) * POSTING_NUM_BYTE;
+ balanceRAM();
+ for(int i=numToCopy;i postingsFreeList.length) {
+ final int newSize = (int) (1.25 * (postingsFreeCount + numPostings));
+ Posting[] newArray = new Posting[newSize];
+ System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount);
+ postingsFreeList = newArray;
+ }
+ System.arraycopy(postings, 0, postingsFreeList, postingsFreeCount, numPostings);
+ postingsFreeCount += numPostings;
+ numBytesUsed -= numPostings * POSTING_NUM_BYTE;
+ }
+
+ /* Initial chunks size of the shared byte[] blocks used to
+ store postings data */
+ final static int BYTE_BLOCK_SHIFT = 15;
+ final static int BYTE_BLOCK_SIZE = (int) Math.pow(2.0, BYTE_BLOCK_SHIFT);
+ final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
+ final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
+
+ private ArrayList freeByteBlocks = new ArrayList();
+
+ /* Allocate another byte[] from the shared pool */
+ synchronized byte[] getByteBlock() {
+ final int size = freeByteBlocks.size();
+ final byte[] b;
+ if (0 == size) {
+ numBytesAlloc += BYTE_BLOCK_SIZE;
+ balanceRAM();
+ b = new byte[BYTE_BLOCK_SIZE];
+ } else
+ b = (byte[]) freeByteBlocks.remove(size-1);
+ numBytesUsed += BYTE_BLOCK_SIZE;
+ return b;
+ }
+
+ /* Return a byte[] to the pool */
+ synchronized void recycleByteBlocks(byte[][] blocks, int numBlocks) {
+ for(int i=0;i freeTrigger) {
+ if (infoStream != null)
+ infoStream.println(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +
+ " vs trigger=" + toMB(flushTrigger) +
+ " allocMB=" + toMB(numBytesAlloc) +
+ " vs trigger=" + toMB(freeTrigger) +
+ " postingsFree=" + toMB(postingsFreeCount*POSTING_NUM_BYTE) +
+ " byteBlockFree=" + toMB(freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
+ " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));
+
+ // When we've crossed 100% of our target Postings
+ // RAM usage, try to free up until we're back down
+ // to 95%
+ final long startBytesAlloc = numBytesAlloc;
+
+ final int postingsFreeChunk = (int) (BYTE_BLOCK_SIZE / POSTING_NUM_BYTE);
+
+ int iter = 0;
+
+ // We free equally from each pool in 64 KB
+ // chunks until we are below our threshold
+ // (freeLevel)
+
+ while(numBytesAlloc > freeLevel) {
+ if (0 == freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == postingsFreeCount) {
+ // Nothing else to free -- must flush now.
+ postingsIsFull = true;
+ if (infoStream != null)
+ infoStream.println(" nothing to free; now set postingsIsFull");
+ break;
+ }
+
+ if ((0 == iter % 3) && freeByteBlocks.size() > 0) {
+ freeByteBlocks.remove(freeByteBlocks.size()-1);
+ numBytesAlloc -= BYTE_BLOCK_SIZE;
+ }
+
+ if ((1 == iter % 3) && freeCharBlocks.size() > 0) {
+ freeCharBlocks.remove(freeCharBlocks.size()-1);
+ numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
+ }
+
+ if ((2 == iter % 3) && postingsFreeCount > 0) {
+ final int numToFree;
+ if (postingsFreeCount >= postingsFreeChunk)
+ numToFree = postingsFreeChunk;
+ else
+ numToFree = postingsFreeCount;
+ Arrays.fill(postingsFreeList, postingsFreeCount-numToFree, postingsFreeCount, null);
+ postingsFreeCount -= numToFree;
+ numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
+ }
+
+ iter++;
+ }
+
+ if (infoStream != null)
+ infoStream.println(" after free: freedMB=" + nf.format((startBytesAlloc-numBytesAlloc)/1024./1024.) + " usedMB=" + nf.format(numBytesUsed/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.));
+
+ } else {
+ // If we have not crossed the 100% mark, but have
+ // crossed the 95% mark of RAM we are actually
+ // using, go ahead and flush. This prevents
+ // over-allocating and then freeing, with every
+ // flush.
+ if (numBytesUsed > flushTrigger) {
+ if (infoStream != null)
+ infoStream.println(" RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) +
+ " allocMB=" + nf.format(numBytesAlloc/1024./1024.) +
+ " triggerMB=" + nf.format(flushTrigger/1024./1024.));
+
+ postingsIsFull = true;
+ }
+ }
+ }
+
+ /* Used to track postings for a single term. One of these
+ * exists per unique term seen since the last flush. */
+ private final static class Posting {
+ int textStart; // Address into char[] blocks where our text is stored
+ int docFreq; // # times this term occurs in the current doc
+ int freqStart; // Address of first byte[] slice for freq
+ int freqUpto; // Next write address for freq
+ int proxStart; // Address of first byte[] slice
+ int proxUpto; // Next write address for prox
+ int lastDocID; // Last docID where this term occurred
+ int lastDocCode; // Code for prior doc
+ int lastPosition; // Last position where this term occurred
+ PostingVector vector; // Corresponding PostingVector instance
+ }
+
+ /* Used to track data for term vectors. One of these
+ * exists per unique term seen in each field in the
+ * document. */
+ private final static class PostingVector {
+ Posting p; // Corresponding Posting instance for this term
+ int lastOffset; // Last offset we saw
+ int offsetStart; // Address of first slice for offsets
+ int offsetUpto; // Next write address for offsets
+ int posStart; // Address of first slice for positions
+ int posUpto; // Next write address for positions
+ }
+}
Property changes on: src/java/org/apache/lucene/index/DocumentsWriter.java
___________________________________________________________________
Name: svn:eol-style
+ native
Index: src/java/org/apache/lucene/index/TermVectorsReader.java
===================================================================
--- src/java/org/apache/lucene/index/TermVectorsReader.java (revision 547668)
+++ src/java/org/apache/lucene/index/TermVectorsReader.java (working copy)
@@ -33,6 +33,11 @@
private IndexInput tvd;
private IndexInput tvf;
private int size;
+
+ // This is -1 if we have our own fields (*.fdx, *.fdt)
+ // file else the docID offset where our docs begin in
+ // these files
+ private int docStoreOffset;
private int tvdFormat;
private int tvfFormat;
@@ -44,6 +49,11 @@
TermVectorsReader(Directory d, String segment, FieldInfos fieldInfos, int readBufferSize)
throws CorruptIndexException, IOException {
+ this(d, segment, fieldInfos, BufferedIndexInput.BUFFER_SIZE, -1, 0);
+ }
+
+ TermVectorsReader(Directory d, String segment, FieldInfos fieldInfos, int readBufferSize, int docStoreOffset, int size)
+ throws CorruptIndexException, IOException {
if (d.fileExists(segment + TermVectorsWriter.TVX_EXTENSION)) {
tvx = d.openInput(segment + TermVectorsWriter.TVX_EXTENSION, readBufferSize);
checkValidFormat(tvx);
@@ -51,7 +61,16 @@
tvdFormat = checkValidFormat(tvd);
tvf = d.openInput(segment + TermVectorsWriter.TVF_EXTENSION, readBufferSize);
tvfFormat = checkValidFormat(tvf);
- size = (int) tvx.length() / 8;
+ if (-1 == docStoreOffset) {
+ this.docStoreOffset = 0;
+ this.size = (int) (tvx.length() / 8);
+ } else {
+ this.docStoreOffset = docStoreOffset;
+ this.size = size;
+ }
+ // Verify the file is long enough to hold all of our
+ // docs
+ assert ((int) (tvx.length()/8)) >= size + docStoreOffset;
}
this.fieldInfos = fieldInfos;
@@ -102,7 +121,7 @@
//We don't need to do this in other seeks because we already have the
// file pointer
//that was written in another file
- tvx.seek((docNum * 8L) + TermVectorsWriter.FORMAT_SIZE);
+ tvx.seek(((docNum + docStoreOffset) * 8L) + TermVectorsWriter.FORMAT_SIZE);
//System.out.println("TVX Pointer: " + tvx.getFilePointer());
long position = tvx.readLong();
@@ -154,7 +173,7 @@
// Check if no term vectors are available for this segment at all
if (tvx != null) {
//We need to offset by
- tvx.seek((docNum * 8L) + TermVectorsWriter.FORMAT_SIZE);
+ tvx.seek(((docNum + docStoreOffset) * 8L) + TermVectorsWriter.FORMAT_SIZE);
long position = tvx.readLong();
tvd.seek(position);
Index: src/java/org/apache/lucene/index/SegmentInfos.java
===================================================================
--- src/java/org/apache/lucene/index/SegmentInfos.java (revision 547668)
+++ src/java/org/apache/lucene/index/SegmentInfos.java (working copy)
@@ -51,8 +51,12 @@
*/
public static final int FORMAT_SINGLE_NORM_FILE = -3;
+ /** This format allows multiple segments to share a single
+ * vectors and stored fields file. */
+ public static final int FORMAT_SHARED_DOC_STORE = -4;
+
/* This must always point to the most recent file format. */
- private static final int CURRENT_FORMAT = FORMAT_SINGLE_NORM_FILE;
+ private static final int CURRENT_FORMAT = FORMAT_SHARED_DOC_STORE;
public int counter = 0; // used to name new segments
/**
Index: src/java/org/apache/lucene/index/FieldsWriter.java
===================================================================
--- src/java/org/apache/lucene/index/FieldsWriter.java (revision 547668)
+++ src/java/org/apache/lucene/index/FieldsWriter.java (working copy)
@@ -24,6 +24,7 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.store.IndexOutput;
final class FieldsWriter
@@ -38,17 +39,94 @@
private IndexOutput indexStream;
+ private boolean doClose;
+
FieldsWriter(Directory d, String segment, FieldInfos fn) throws IOException {
fieldInfos = fn;
fieldsStream = d.createOutput(segment + ".fdt");
indexStream = d.createOutput(segment + ".fdx");
+ doClose = true;
}
+ FieldsWriter(IndexOutput fdx, IndexOutput fdt, FieldInfos fn) throws IOException {
+ fieldInfos = fn;
+ fieldsStream = fdt;
+ indexStream = fdx;
+ doClose = false;
+ }
+
+ // Writes the contents of buffer into the fields stream
+ // and adds a new entry for this document into the index
+ // stream. This assumes the buffer was already written
+ // in the correct fields format.
+ void flushDocument(RAMOutputStream buffer) throws IOException {
+ indexStream.writeLong(fieldsStream.getFilePointer());
+ buffer.writeTo(fieldsStream);
+ }
+
+ void flush() throws IOException {
+ indexStream.flush();
+ fieldsStream.flush();
+ }
+
final void close() throws IOException {
+ if (doClose) {
fieldsStream.close();
indexStream.close();
+ }
}
+ final void writeField(FieldInfo fi, Fieldable field) throws IOException {
+ // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode
+ // and field.binaryValue() already returns the compressed value for a field
+ // with isCompressed()==true, so we disable compression in that case
+ boolean disableCompression = (field instanceof FieldsReader.FieldForMerge);
+ fieldsStream.writeVInt(fi.number);
+ byte bits = 0;
+ if (field.isTokenized())
+ bits |= FieldsWriter.FIELD_IS_TOKENIZED;
+ if (field.isBinary())
+ bits |= FieldsWriter.FIELD_IS_BINARY;
+ if (field.isCompressed())
+ bits |= FieldsWriter.FIELD_IS_COMPRESSED;
+
+ fieldsStream.writeByte(bits);
+
+ if (field.isCompressed()) {
+ // compression is enabled for the current field
+ byte[] data = null;
+
+ if (disableCompression) {
+ // optimized case for merging, the data
+ // is already compressed
+ data = field.binaryValue();
+ } else {
+ // check if it is a binary field
+ if (field.isBinary()) {
+ data = compress(field.binaryValue());
+ }
+ else {
+ data = compress(field.stringValue().getBytes("UTF-8"));
+ }
+ }
+ final int len = data.length;
+ fieldsStream.writeVInt(len);
+ fieldsStream.writeBytes(data, len);
+ }
+ else {
+ // compression is disabled for the current field
+ if (field.isBinary()) {
+ byte[] data = field.binaryValue();
+ final int len = data.length;
+ fieldsStream.writeVInt(len);
+ fieldsStream.writeBytes(data, len);
+ }
+ else {
+ fieldsStream.writeString(field.stringValue());
+ }
+ }
+ }
+
final void addDocument(Document doc) throws IOException {
indexStream.writeLong(fieldsStream.getFilePointer());
@@ -64,57 +142,8 @@
fieldIterator = doc.getFields().iterator();
while (fieldIterator.hasNext()) {
Fieldable field = (Fieldable) fieldIterator.next();
- // if the field as an instanceof FieldsReader.FieldForMerge, we're in merge mode
- // and field.binaryValue() already returns the compressed value for a field
- // with isCompressed()==true, so we disable compression in that case
- boolean disableCompression = (field instanceof FieldsReader.FieldForMerge);
- if (field.isStored()) {
- fieldsStream.writeVInt(fieldInfos.fieldNumber(field.name()));
-
- byte bits = 0;
- if (field.isTokenized())
- bits |= FieldsWriter.FIELD_IS_TOKENIZED;
- if (field.isBinary())
- bits |= FieldsWriter.FIELD_IS_BINARY;
- if (field.isCompressed())
- bits |= FieldsWriter.FIELD_IS_COMPRESSED;
-
- fieldsStream.writeByte(bits);
-
- if (field.isCompressed()) {
- // compression is enabled for the current field
- byte[] data = null;
-
- if (disableCompression) {
- // optimized case for merging, the data
- // is already compressed
- data = field.binaryValue();
- } else {
- // check if it is a binary field
- if (field.isBinary()) {
- data = compress(field.binaryValue());
- }
- else {
- data = compress(field.stringValue().getBytes("UTF-8"));
- }
- }
- final int len = data.length;
- fieldsStream.writeVInt(len);
- fieldsStream.writeBytes(data, len);
- }
- else {
- // compression is disabled for the current field
- if (field.isBinary()) {
- byte[] data = field.binaryValue();
- final int len = data.length;
- fieldsStream.writeVInt(len);
- fieldsStream.writeBytes(data, len);
- }
- else {
- fieldsStream.writeString(field.stringValue());
- }
- }
- }
+ if (field.isStored())
+ writeField(fieldInfos.fieldInfo(field.name()), field);
}
}
Index: src/java/org/apache/lucene/index/SegmentMerger.java
===================================================================
--- src/java/org/apache/lucene/index/SegmentMerger.java (revision 547668)
+++ src/java/org/apache/lucene/index/SegmentMerger.java (working copy)
@@ -52,6 +52,12 @@
private int mergedDocs;
+ // Whether we should merge doc stores (stored fields and
+ // vectors files). When all segments we are merging
+ // already share the same doc store files, we don't need
+ // to merge the doc stores.
+ private boolean mergeDocStores;
+
/** This ctor used only by test code.
*
* @param dir The Directory to merge the other segments into
@@ -92,18 +98,32 @@
* @throws IOException if there is a low-level IO error
*/
final int merge() throws CorruptIndexException, IOException {
- int value;
-
+ return merge(true);
+ }
+
+ /**
+ * Merges the readers specified by the {@link #add} method
+ * into the directory passed to the constructor.
+ * @param mergeDocStores if false, we will not merge the
+ * stored fields nor vectors files
+ * @return The number of documents that were merged
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ */
+ final int merge(boolean mergeDocStores) throws CorruptIndexException, IOException {
+
+ this.mergeDocStores = mergeDocStores;
+
mergedDocs = mergeFields();
mergeTerms();
mergeNorms();
- if (fieldInfos.hasVectors())
+ if (mergeDocStores && fieldInfos.hasVectors())
mergeVectors();
return mergedDocs;
}
-
+
/**
* close all IndexReaders that have been added.
* Should not be called before merge().
@@ -126,7 +146,10 @@
// Basic files
for (int i = 0; i < IndexFileNames.COMPOUND_EXTENSIONS.length; i++) {
- files.add(segment + "." + IndexFileNames.COMPOUND_EXTENSIONS[i]);
+ String ext = IndexFileNames.COMPOUND_EXTENSIONS[i];
+ if (mergeDocStores || (!ext.equals(IndexFileNames.FIELDS_EXTENSION) &&
+ !ext.equals(IndexFileNames.FIELDS_INDEX_EXTENSION)))
+ files.add(segment + "." + ext);
}
// Fieldable norm files
@@ -139,7 +162,7 @@
}
// Vector files
- if (fieldInfos.hasVectors()) {
+ if (fieldInfos.hasVectors() && mergeDocStores) {
for (int i = 0; i < IndexFileNames.VECTOR_EXTENSIONS.length; i++) {
files.add(segment + "." + IndexFileNames.VECTOR_EXTENSIONS[i]);
}
@@ -173,7 +196,20 @@
* @throws IOException if there is a low-level IO error
*/
private final int mergeFields() throws CorruptIndexException, IOException {
- fieldInfos = new FieldInfos(); // merge field names
+
+ if (!mergeDocStores) {
+ // When we are not merging by doc stores, that means
+ // all segments were written as part of a single
+ // autoCommit=false IndexWriter session, so their field
+ // name -> number mapping are the same. So, we start
+ // with the fieldInfos of the last segment in this
+ // case, to keep that numbering.
+ final SegmentReader sr = (SegmentReader) readers.elementAt(readers.size()-1);
+ fieldInfos = (FieldInfos) sr.fieldInfos.clone();
+ } else {
+ fieldInfos = new FieldInfos(); // merge field names
+ }
+
int docCount = 0;
for (int i = 0; i < readers.size(); i++) {
IndexReader reader = (IndexReader) readers.elementAt(i);
@@ -187,30 +223,40 @@
}
fieldInfos.write(directory, segment + ".fnm");
- FieldsWriter fieldsWriter = // merge field values
- new FieldsWriter(directory, segment, fieldInfos);
+ if (mergeDocStores) {
+
+ FieldsWriter fieldsWriter = // merge field values
+ new FieldsWriter(directory, segment, fieldInfos);
- // for merging we don't want to compress/uncompress the data, so to tell the FieldsReader that we're
- // in merge mode, we use this FieldSelector
- FieldSelector fieldSelectorMerge = new FieldSelector() {
- public FieldSelectorResult accept(String fieldName) {
- return FieldSelectorResult.LOAD_FOR_MERGE;
- }
- };
-
- try {
- for (int i = 0; i < readers.size(); i++) {
- IndexReader reader = (IndexReader) readers.elementAt(i);
- int maxDoc = reader.maxDoc();
- for (int j = 0; j < maxDoc; j++)
- if (!reader.isDeleted(j)) { // skip deleted docs
- fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge));
- docCount++;
- }
+ // for merging we don't want to compress/uncompress the data, so to tell the FieldsReader that we're
+ // in merge mode, we use this FieldSelector
+ FieldSelector fieldSelectorMerge = new FieldSelector() {
+ public FieldSelectorResult accept(String fieldName) {
+ return FieldSelectorResult.LOAD_FOR_MERGE;
+ }
+ };
+
+ try {
+ for (int i = 0; i < readers.size(); i++) {
+ IndexReader reader = (IndexReader) readers.elementAt(i);
+ int maxDoc = reader.maxDoc();
+ for (int j = 0; j < maxDoc; j++)
+ if (!reader.isDeleted(j)) { // skip deleted docs
+ fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge));
+ docCount++;
+ }
+ }
+ } finally {
+ fieldsWriter.close();
}
- } finally {
- fieldsWriter.close();
- }
+
+ } else
+ // If we are skipping the doc stores, that means there
+ // are no deletions in any of these segments, so we
+ // just sum numDocs() of each segment to get total docCount
+ for (int i = 0; i < readers.size(); i++)
+ docCount += ((IndexReader) readers.elementAt(i)).numDocs();
+
return docCount;
}
@@ -355,6 +401,7 @@
for (int i = 0; i < n; i++) {
SegmentMergeInfo smi = smis[i];
TermPositions postings = smi.getPositions();
+ assert postings != null;
int base = smi.base;
int[] docMap = smi.getDocMap();
postings.seek(smi.termEnum);
Index: src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- src/java/org/apache/lucene/index/IndexWriter.java (revision 547668)
+++ src/java/org/apache/lucene/index/IndexWriter.java (working copy)
@@ -61,14 +61,19 @@
When finished adding, deleting and updating documents, close should be called.
These changes are buffered in memory and periodically
- flushed to the {@link Directory} (during the above method calls). A flush is triggered when there are
- enough buffered deletes (see {@link
- #setMaxBufferedDeleteTerms}) or enough added documents
- (see {@link #setMaxBufferedDocs}) since the last flush,
- whichever is sooner. You can also force a flush by
- calling {@link #flush}. When a flush occurs, both pending
- deletes and added documents are flushed to the index. A
- flush may also trigger one or more segment merges.
+ flushed to the {@link Directory} (during the above method
+ calls). A flush is triggered when there are enough
+ buffered deletes (see {@link #setMaxBufferedDeleteTerms})
+ or enough added documents since the last flush, whichever
+ is sooner. For the added documents, flushing is triggered
+ either by RAM usage of the documents (this is the default;
+ see {@link #setRAMBufferSizeMB}) or the number of added
+ documents (see {@link #setMaxBufferedDocs}). For best
+ indexing speed you should flush by RAM usage with a large
+ RAM buffer. You can also force a flush by calling {@link
+ #flush}. When a flush occurs, both pending deletes and
+ added documents are flushed to the index. A flush may
+ also trigger one or more segment merges.
The optional autoCommit argument to the
@@ -179,11 +184,17 @@
public final static int DEFAULT_MERGE_FACTOR = 10;
/**
- * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}.
+ * Default value is 0 (meaning flush is based on RAM usage
+ * by default). Change using {@link #setMaxBufferedDocs}.
*/
- public final static int DEFAULT_MAX_BUFFERED_DOCS = 10;
+ public final static int DEFAULT_MAX_BUFFERED_DOCS = 0;
/**
+ * Default value is 16 MB. Change using {@link #setRAMBufferSizeMB}.
+ */
+ public final static double DEFAULT_RAM_BUFFER_SIZE_MB = 16.0;
+
+ /**
* Default value is 1000. Change using {@link #setMaxBufferedDeleteTerms(int)}.
*/
public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000;
@@ -224,8 +235,7 @@
private boolean autoCommit = true; // false if we should commit only on close
SegmentInfos segmentInfos = new SegmentInfos(); // the segments
- SegmentInfos ramSegmentInfos = new SegmentInfos(); // the segments in ramDirectory
- private final RAMDirectory ramDirectory = new RAMDirectory(); // for temp segs
+ private DocumentsWriter docWriter;
private IndexFileDeleter deleter;
private Lock writeLock;
@@ -621,11 +631,14 @@
rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
}
+ docWriter = new DocumentsWriter(directory, this);
+ docWriter.setInfoStream(infoStream);
+
// Default deleter (for backwards compatibility) is
// KeepOnlyLastCommitDeleter:
deleter = new IndexFileDeleter(directory,
deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy,
- segmentInfos, infoStream);
+ segmentInfos, infoStream, docWriter);
} catch (IOException e) {
this.writeLock.release();
@@ -683,34 +696,67 @@
return maxFieldLength;
}
- /** Determines the minimal number of documents required before the buffered
- * in-memory documents are merged and a new Segment is created.
- * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory},
- * large value gives faster indexing. At the same time, mergeFactor limits
- * the number of files open in a FSDirectory.
+ /** Determines the minimal number of documents required
+ * before the buffered in-memory documents are flushed as
+ * a new Segment. Large values generally gives faster
+ * indexing.
*
- *
The default value is 10.
+ *
When this is set, the writer will flush every
+ * maxBufferedDocs added documents and never flush by RAM
+ * usage.
*
- * @throws IllegalArgumentException if maxBufferedDocs is smaller than 2
+ * The default value is 0 (writer flushes by RAM
+ * usage).
+ *
+ * @throws IllegalArgumentException if maxBufferedDocs is
+ * smaller than 2
+ * @see #setRAMBufferSizeMB
*/
public void setMaxBufferedDocs(int maxBufferedDocs) {
ensureOpen();
if (maxBufferedDocs < 2)
throw new IllegalArgumentException("maxBufferedDocs must at least be 2");
- this.minMergeDocs = maxBufferedDocs;
+ docWriter.setMaxBufferedDocs(maxBufferedDocs);
}
/**
- * Returns the number of buffered added documents that will
+ * Returns 0 if this writer is flushing by RAM usage, else
+ * returns the number of buffered added documents that will
* trigger a flush.
* @see #setMaxBufferedDocs
*/
public int getMaxBufferedDocs() {
ensureOpen();
- return minMergeDocs;
+ return docWriter.getMaxBufferedDocs();
}
+ /** Determines the amount of RAM that may be used for
+ * buffering added documents before they are flushed as a
+ * new Segment. Generally for faster indexing performance
+ * it's best to flush by RAM usage instead of document
+ * count and use as large a RAM buffer as you can.
+ *
+ * When this is set, the writer will flush whenever
+ * buffered documents use this much RAM.
+ *
+ * The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
+ */
+ public void setRAMBufferSizeMB(double mb) {
+ if (mb <= 0.0)
+ throw new IllegalArgumentException("ramBufferSize should be > 0.0 MB");
+ docWriter.setRAMBufferSizeMB(mb);
+ }
+
/**
+ * Returns 0.0 if this writer is flushing by document
+ * count, else returns the value set by {@link
+ * #setRAMBufferSizeMB}.
+ */
+ public double getRAMBufferSizeMB() {
+ return docWriter.getRAMBufferSizeMB();
+ }
+
+ /**
* Determines the minimal number of delete terms required before the buffered
* in-memory delete terms are applied and flushed. If there are documents
* buffered in memory at the time, they are merged and a new segment is
@@ -788,6 +834,7 @@
public void setInfoStream(PrintStream infoStream) {
ensureOpen();
this.infoStream = infoStream;
+ docWriter.setInfoStream(infoStream);
deleter.setInfoStream(infoStream);
}
@@ -871,7 +918,7 @@
*/
public synchronized void close() throws CorruptIndexException, IOException {
if (!closed) {
- flushRamSegments();
+ flush(true, true);
if (commitPending) {
segmentInfos.write(directory); // now commit changes
@@ -880,18 +927,78 @@
rollbackSegmentInfos = null;
}
- ramDirectory.close();
if (writeLock != null) {
writeLock.release(); // release write lock
writeLock = null;
}
closed = true;
+ docWriter = null;
if(closeDir)
directory.close();
}
}
+ /** Tells the docWriter to close its currently open shared
+ * doc stores (stored fields & vectors files). */
+ private void flushDocStores() throws IOException {
+
+ List files = docWriter.files();
+
+ if (files.size() > 0) {
+ String docStoreSegment;
+
+ boolean success = false;
+ try {
+ docStoreSegment = docWriter.closeDocStore();
+ success = true;
+ } finally {
+ if (!success)
+ docWriter.abort();
+ }
+
+ if (useCompoundFile && docStoreSegment != null) {
+ // Now build compound doc store file
+ checkpoint();
+
+ success = false;
+
+ final int numSegments = segmentInfos.size();
+
+ try {
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
+ final int size = files.size();
+ for(int i=0;iterm.
* @param term the term to identify the documents to be deleted
@@ -1025,7 +1117,7 @@
public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
bufferDeleteTerm(term);
- maybeFlushRamSegments();
+ maybeFlush();
}
/**
@@ -1041,7 +1133,7 @@
for (int i = 0; i < terms.length; i++) {
bufferDeleteTerm(terms[i]);
}
- maybeFlushRamSegments();
+ maybeFlush();
}
/**
@@ -1077,26 +1169,23 @@
public void updateDocument(Term term, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
ensureOpen();
- SegmentInfo newSegmentInfo = buildSingleDocSegment(doc, analyzer);
- synchronized (this) {
+ synchronized(this) {
bufferDeleteTerm(term);
- ramSegmentInfos.addElement(newSegmentInfo);
- maybeFlushRamSegments();
}
+ if (docWriter.addDocument(doc, analyzer))
+ flush(true, false);
+ else
+ maybeFlush();
}
- final synchronized String newRamSegmentName() {
- return "_ram_" + Integer.toString(ramSegmentInfos.counter++, Character.MAX_RADIX);
- }
-
// for test purpose
final synchronized int getSegmentCount(){
return segmentInfos.size();
}
// for test purpose
- final synchronized int getRamSegmentCount(){
- return ramSegmentInfos.size();
+ final synchronized int getNumBufferedDocuments(){
+ return docWriter.getNumDocsInRAM();
}
// for test purpose
@@ -1125,18 +1214,11 @@
*/
private int mergeFactor = DEFAULT_MERGE_FACTOR;
- /** Determines the minimal number of documents required before the buffered
- * in-memory documents are merging and a new Segment is created.
- * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory},
- * large value gives faster indexing. At the same time, mergeFactor limits
- * the number of files open in a FSDirectory.
- *
- * The default value is {@link #DEFAULT_MAX_BUFFERED_DOCS}.
-
+ /** Determines amount of RAM usage by the buffered docs at
+ * which point we trigger a flush to the index.
*/
- private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS;
+ private double ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F;
-
/** Determines the largest number of documents ever merged by addDocument().
* Small values (e.g., less than 10,000) are best for interactive indexing,
* as this limits the length of pauses while indexing to a few seconds.
@@ -1219,16 +1301,16 @@
*/
public synchronized void optimize() throws CorruptIndexException, IOException {
ensureOpen();
- flushRamSegments();
+ flush();
while (segmentInfos.size() > 1 ||
(segmentInfos.size() == 1 &&
(SegmentReader.hasDeletions(segmentInfos.info(0)) ||
SegmentReader.hasSeparateNorms(segmentInfos.info(0)) ||
segmentInfos.info(0).dir != directory ||
(useCompoundFile &&
- (!SegmentReader.usesCompoundFile(segmentInfos.info(0))))))) {
+ !segmentInfos.info(0).getUseCompoundFile())))) {
int minSegment = segmentInfos.size() - mergeFactor;
- mergeSegments(segmentInfos, minSegment < 0 ? 0 : minSegment, segmentInfos.size());
+ mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size());
}
}
@@ -1245,7 +1327,7 @@
localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
localAutoCommit = autoCommit;
if (localAutoCommit) {
- flushRamSegments();
+ flush();
// Turn off auto-commit during our local transaction:
autoCommit = false;
} else
@@ -1335,16 +1417,18 @@
segmentInfos.clear();
segmentInfos.addAll(rollbackSegmentInfos);
+ docWriter.abort();
+
// Ask deleter to locate unreferenced files & remove
// them:
deleter.checkpoint(segmentInfos, false);
deleter.refresh();
- ramSegmentInfos = new SegmentInfos();
bufferedDeleteTerms.clear();
numBufferedDeleteTerms = 0;
commitPending = false;
+ docWriter.abort();
close();
} else {
@@ -1439,7 +1523,7 @@
for (int base = start; base < segmentInfos.size(); base++) {
int end = Math.min(segmentInfos.size(), base+mergeFactor);
if (end-base > 1) {
- mergeSegments(segmentInfos, base, end);
+ mergeSegments(base, end);
}
}
}
@@ -1479,7 +1563,7 @@
// segments in S may not since they could come from multiple indexes.
// Here is the merge algorithm for addIndexesNoOptimize():
//
- // 1 Flush ram segments.
+ // 1 Flush ram.
// 2 Consider a combined sequence with segments from T followed
// by segments from S (same as current addIndexes(Directory[])).
// 3 Assume the highest level for segments in S is h. Call
@@ -1500,14 +1584,18 @@
// copy a segment, which may cause doc count to change because deleted
// docs are garbage collected.
- // 1 flush ram segments
+ // 1 flush ram
ensureOpen();
- flushRamSegments();
+ flush();
// 2 copy segment infos and find the highest level from dirs
- int startUpperBound = minMergeDocs;
+ int startUpperBound = docWriter.getMaxBufferedDocs();
+ // nocommit -- must fix merge policy
+ if (startUpperBound == 0)
+ startUpperBound = 10;
+
boolean success = false;
startTransaction();
@@ -1566,7 +1654,7 @@
// copy those segments from S
for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) {
- mergeSegments(segmentInfos, i, i + 1);
+ mergeSegments(i, i + 1);
}
if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) {
success = true;
@@ -1575,7 +1663,7 @@
}
// invariants do not hold, simply merge those segments
- mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount);
+ mergeSegments(segmentCount - numTailSegments, segmentCount);
// maybe merge segments again if necessary
if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) {
@@ -1637,7 +1725,8 @@
}
segmentInfos.setSize(0); // pop old infos & add new
- info = new SegmentInfo(mergedName, docCount, directory, false, true);
+ info = new SegmentInfo(mergedName, docCount, directory, false, true,
+ -1, null, false);
segmentInfos.addElement(info);
success = true;
@@ -1662,7 +1751,7 @@
startTransaction();
try {
- merger.createCompoundFile(mergedName + ".cfs");
+ merger.createCompoundFile(mergedName);
info.setUseCompoundFile(true);
} finally {
if (!success) {
@@ -1720,29 +1809,18 @@
* buffered added documents or buffered deleted terms are
* large enough.
*/
- protected final void maybeFlushRamSegments() throws CorruptIndexException, IOException {
+ protected final synchronized void maybeFlush() throws CorruptIndexException, IOException {
// A flush is triggered if enough new documents are buffered or
- // if enough delete terms are buffered
- if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) {
- flushRamSegments();
- }
+ // if enough delete terms are buffered or enough RAM is
+ // being consumed
+ if (numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending())
+ flush(true, false);
}
- /** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */
- private final synchronized void flushRamSegments() throws CorruptIndexException, IOException {
- flushRamSegments(true);
+ public final synchronized void flush() throws CorruptIndexException, IOException {
+ flush(true, false);
}
-
- /** Expert: Flushes all RAM-resident segments (buffered documents),
- * then may merge segments if triggerMerge==true. */
- protected final synchronized void flushRamSegments(boolean triggerMerge)
- throws CorruptIndexException, IOException {
- if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) {
- mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size());
- if (triggerMerge) maybeMergeSegments(minMergeDocs);
- }
- }
-
+
/**
* Flush all in-memory buffered updates (adds and deletes)
* to the Directory.
@@ -1751,9 +1829,156 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public final synchronized void flush() throws CorruptIndexException, IOException {
+ public final synchronized void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException {
ensureOpen();
- flushRamSegments();
+
+ // Make sure no threads are actively adding a document
+ docWriter.pauseAllThreads();
+
+ try {
+
+ SegmentInfo newSegment = null;
+
+ final int numDocs = docWriter.getNumDocsInRAM();
+
+ // Always flush docs if there are any
+ boolean flushDocs = numDocs > 0;
+
+ // With autoCommit=true we always must flush the doc
+ // stores when we flush
+ flushDocStores |= autoCommit;
+ String docStoreSegment = docWriter.getDocStoreSegment();
+ if (docStoreSegment == null)
+ flushDocStores = false;
+
+ // Always flush deletes if there are any delete terms.
+ // TODO: when autoCommit=false we don't have to flush
+ // deletes with every flushed segment; we can save
+ // CPU/IO by buffering longer & flushing deletes only
+ // when they are full or writer is being closed. We
+ // have to fix the "applyDeletesSelectively" logic to
+ // apply to more than just the last flushed segment
+ boolean flushDeletes = bufferedDeleteTerms.size() > 0;
+
+ if (infoStream != null)
+ infoStream.println(" flush: flushDocs=" + flushDocs +
+ " flushDeletes=" + flushDeletes +
+ " flushDocStores=" + flushDocStores +
+ " numDocs=" + numDocs);
+
+ int docStoreOffset = docWriter.getDocStoreOffset();
+ boolean docStoreIsCompoundFile = false;
+
+ // Check if the doc stores must be separately flushed
+ // because other segments, besides the one we are about
+ // to flush, reference it
+ if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) {
+ // We must separately flush the doc store
+ if (infoStream != null)
+ infoStream.println(" flush shared docStore segment " + docStoreSegment);
+
+ flushDocStores();
+ flushDocStores = false;
+ docStoreIsCompoundFile = useCompoundFile;
+ }
+
+ String segment = docWriter.getSegment();
+
+ if (flushDocs || flushDeletes) {
+
+ SegmentInfos rollback = null;
+
+ if (flushDeletes)
+ rollback = (SegmentInfos) segmentInfos.clone();
+
+ boolean success = false;
+
+ try {
+ if (flushDocs) {
+
+ if (0 == docStoreOffset && flushDocStores) {
+ // This means we are flushing private doc stores
+ // with this segment, so it will not be shared
+ // with other segments
+ assert docStoreSegment != null;
+ assert docStoreSegment.equals(segment);
+ docStoreOffset = -1;
+ docStoreIsCompoundFile = false;
+ docStoreSegment = null;
+ }
+
+ int flushedDocCount = docWriter.flush(flushDocStores);
+
+ newSegment = new SegmentInfo(segment,
+ flushedDocCount,
+ directory, false, true,
+ docStoreOffset, docStoreSegment,
+ docStoreIsCompoundFile);
+ segmentInfos.addElement(newSegment);
+ }
+
+ if (flushDeletes) {
+ // we should be able to change this so we can
+ // buffer deletes longer and then flush them to
+ // multiple flushed segments, when
+ // autoCommit=false
+ applyDeletes(flushDocs);
+ doAfterFlush();
+ }
+
+ checkpoint();
+ success = true;
+ } finally {
+ if (!success) {
+ if (flushDeletes) {
+ // Fully replace the segmentInfos since flushed
+ // deletes could have changed any of the
+ // SegmentInfo instances:
+ segmentInfos.clear();
+ segmentInfos.addAll(rollback);
+ } else {
+ // Remove segment we added, if any:
+ if (newSegment != null &&
+ segmentInfos.size() > 0 &&
+ segmentInfos.info(segmentInfos.size()-1) == newSegment)
+ segmentInfos.remove(segmentInfos.size()-1);
+ }
+ if (flushDocs)
+ docWriter.abort();
+ deleter.checkpoint(segmentInfos, false);
+ deleter.refresh();
+ }
+ }
+
+ deleter.checkpoint(segmentInfos, autoCommit);
+
+ if (flushDocs && useCompoundFile) {
+ success = false;
+ try {
+ docWriter.createCompoundFile(segment);
+ newSegment.setUseCompoundFile(true);
+ checkpoint();
+ success = true;
+ } finally {
+ if (!success) {
+ newSegment.setUseCompoundFile(false);
+ deleter.refresh();
+ }
+ }
+
+ deleter.checkpoint(segmentInfos, autoCommit);
+ }
+
+ // nocommit -- must fix merge policy
+ if (0 == docWriter.getMaxBufferedDocs())
+ maybeMergeSegments(mergeFactor * numDocs / 2);
+ else
+ maybeMergeSegments(docWriter.getMaxBufferedDocs());
+ }
+ } finally {
+ docWriter.clearFlushPending();
+ docWriter.resumeAllThreads();
+ }
}
/** Expert: Return the total size of all index files currently cached in memory.
@@ -1761,15 +1986,15 @@
*/
public final long ramSizeInBytes() {
ensureOpen();
- return ramDirectory.sizeInBytes();
+ return docWriter.getRAMUsed();
}
/** Expert: Return the number of documents whose segments are currently cached in memory.
- * Useful when calling flushRamSegments()
+ * Useful when calling flush()
*/
public final synchronized int numRamDocs() {
ensureOpen();
- return ramSegmentInfos.size();
+ return docWriter.getNumDocsInRAM();
}
/** Incremental segment merger. */
@@ -1777,6 +2002,9 @@
long lowerBound = -1;
long upperBound = startUpperBound;
+ // nocommit -- must fix merge policy
+ if (upperBound == 0) upperBound = 10;
+
while (upperBound < maxMergeDocs) {
int minSegment = segmentInfos.size();
int maxSegment = -1;
@@ -1808,7 +2036,7 @@
while (numSegments >= mergeFactor) {
// merge the leftmost* mergeFactor segments
- int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor);
+ int docCount = mergeSegments(minSegment, minSegment + mergeFactor);
numSegments -= mergeFactor;
if (docCount > upperBound) {
@@ -1837,41 +2065,110 @@
* Merges the named range of segments, replacing them in the stack with a
* single segment.
*/
- private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end)
+
+ private final int mergeSegments(int minSegment, int end)
throws CorruptIndexException, IOException {
- // We may be called solely because there are deletes
- // pending, in which case doMerge is false:
- boolean doMerge = end > 0;
final String mergedName = newSegmentName();
+
SegmentMerger merger = null;
-
- final List ramSegmentsToDelete = new ArrayList();
-
SegmentInfo newSegment = null;
int mergedDocCount = 0;
- boolean anyDeletes = (bufferedDeleteTerms.size() != 0);
// This is try/finally to make sure merger's readers are closed:
try {
- if (doMerge) {
- if (infoStream != null) infoStream.print("merging segments");
- merger = new SegmentMerger(this, mergedName);
+ if (infoStream != null) infoStream.print("merging segments");
- for (int i = minSegment; i < end; i++) {
- SegmentInfo si = sourceSegments.info(i);
- if (infoStream != null)
- infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
- IndexReader reader = SegmentReader.get(si, MERGE_READ_BUFFER_SIZE); // no need to set deleter (yet)
- merger.add(reader);
- if (reader.directory() == this.ramDirectory) {
- ramSegmentsToDelete.add(si);
- }
- }
+ // Check whether this merge will allow us to skip
+ // merging the doc stores (stored field & vectors).
+ // This is a very substantial optimization (saves tons
+ // of IO) that can only be applied with
+ // autoCommit=false.
+
+ Directory lastDir = directory;
+ String lastDocStoreSegment = null;
+ boolean mergeDocStores = false;
+ boolean doFlushDocStore = false;
+ int next = -1;
+
+ // Test each segment to be merged
+ for (int i = minSegment; i < end; i++) {
+ SegmentInfo si = segmentInfos.info(i);
+
+ // If it has deletions we must merge the doc stores
+ if (si.hasDeletions())
+ mergeDocStores = true;
+
+ // If it has its own (private) doc stores we must
+ // merge the doc stores
+ if (-1 == si.getDocStoreOffset())
+ mergeDocStores = true;
+
+ // If it has a different doc store segment than
+ // previous segments, we must merge the doc stores
+ String docStoreSegment = si.getDocStoreSegment();
+ if (docStoreSegment == null)
+ mergeDocStores = true;
+ else if (lastDocStoreSegment == null)
+ lastDocStoreSegment = docStoreSegment;
+ else if (!lastDocStoreSegment.equals(docStoreSegment))
+ mergeDocStores = true;
+
+ // Segments' docScoreOffsets must be in-order,
+ // contiguous. For the default merge policy now
+ // this will always be the case but for an arbitrary
+ // merge policy this may not be the case
+ if (-1 == next)
+ next = si.getDocStoreOffset() + si.docCount;
+ else if (next != si.getDocStoreOffset())
+ mergeDocStores = true;
+ else
+ next = si.getDocStoreOffset() + si.docCount;
+
+ // If the segment comes from a different directory
+ // we must merge
+ if (lastDir != si.dir)
+ mergeDocStores = true;
+
+ // If the segment is referencing the current "live"
+ // doc store outputs then we must merge
+ if (si.getDocStoreOffset() != -1 && si.getDocStoreSegment().equals(docWriter.getDocStoreSegment()))
+ doFlushDocStore = true;
}
+ final int docStoreOffset;
+ final String docStoreSegment;
+ final boolean docStoreIsCompoundFile;
+ if (mergeDocStores) {
+ docStoreOffset = -1;
+ docStoreSegment = null;
+ docStoreIsCompoundFile = false;
+ } else {
+ SegmentInfo si = segmentInfos.info(minSegment);
+ docStoreOffset = si.getDocStoreOffset();
+ docStoreSegment = si.getDocStoreSegment();
+ docStoreIsCompoundFile = si.getDocStoreIsCompoundFile();
+ }
+
+ if (mergeDocStores && doFlushDocStore)
+ // SegmentMerger intends to merge the doc stores
+ // (stored fields, vectors), and at least one of the
+ // segments to be merged refers to the currently
+ // live doc stores.
+ flushDocStores();
+
+ merger = new SegmentMerger(this, mergedName);
+
+ for (int i = minSegment; i < end; i++) {
+ SegmentInfo si = segmentInfos.info(i);
+ if (infoStream != null)
+ infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
+ IndexReader reader = SegmentReader.get(si, MERGE_READ_BUFFER_SIZE, mergeDocStores); // no need to set deleter (yet)
+ merger.add(reader);
+ }
+
SegmentInfos rollback = null;
boolean success = false;
@@ -1879,65 +2176,32 @@
// if we hit exception when doing the merge:
try {
- if (doMerge) {
- mergedDocCount = merger.merge();
+ mergedDocCount = merger.merge(mergeDocStores);
- if (infoStream != null) {
- infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
- }
+ if (infoStream != null) {
+ infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
+ }
- newSegment = new SegmentInfo(mergedName, mergedDocCount,
- directory, false, true);
- }
+ newSegment = new SegmentInfo(mergedName, mergedDocCount,
+ directory, false, true,
+ docStoreOffset,
+ docStoreSegment,
+ docStoreIsCompoundFile);
- if (sourceSegments != ramSegmentInfos || anyDeletes) {
- // Now save the SegmentInfo instances that
- // we are replacing:
- rollback = (SegmentInfos) segmentInfos.clone();
- }
+ rollback = (SegmentInfos) segmentInfos.clone();
- if (doMerge) {
- if (sourceSegments == ramSegmentInfos) {
- segmentInfos.addElement(newSegment);
- } else {
- for (int i = end-1; i > minSegment; i--) // remove old infos & add new
- sourceSegments.remove(i);
+ for (int i = end-1; i > minSegment; i--) // remove old infos & add new
+ segmentInfos.remove(i);
- segmentInfos.set(minSegment, newSegment);
- }
- }
+ segmentInfos.set(minSegment, newSegment);
- if (sourceSegments == ramSegmentInfos) {
- maybeApplyDeletes(doMerge);
- doAfterFlush();
- }
-
checkpoint();
success = true;
} finally {
-
- if (success) {
- // The non-ram-segments case is already committed
- // (above), so all the remains for ram segments case
- // is to clear the ram segments:
- if (sourceSegments == ramSegmentInfos) {
- ramSegmentInfos.removeAllElements();
- }
- } else {
-
- // Must rollback so our state matches index:
- if (sourceSegments == ramSegmentInfos && !anyDeletes) {
- // Simple case: newSegment may or may not have
- // been added to the end of our segment infos,
- // so just check & remove if so:
- if (newSegment != null &&
- segmentInfos.size() > 0 &&
- segmentInfos.info(segmentInfos.size()-1) == newSegment) {
- segmentInfos.remove(segmentInfos.size()-1);
- }
- } else if (rollback != null) {
+ if (!success) {
+ if (rollback != null) {
// Rollback the individual SegmentInfo
// instances, but keep original SegmentInfos
// instance (so we don't try to write again the
@@ -1952,26 +2216,21 @@
}
} finally {
// close readers before we attempt to delete now-obsolete segments
- if (doMerge) merger.closeReaders();
+ merger.closeReaders();
}
- // Delete the RAM segments
- deleter.deleteDirect(ramDirectory, ramSegmentsToDelete);
-
// Give deleter a chance to remove files now.
deleter.checkpoint(segmentInfos, autoCommit);
- if (useCompoundFile && doMerge) {
+ if (useCompoundFile) {
boolean success = false;
try {
-
merger.createCompoundFile(mergedName + ".cfs");
newSegment.setUseCompoundFile(true);
checkpoint();
success = true;
-
} finally {
if (!success) {
// Must rollback:
@@ -1988,19 +2247,23 @@
}
// Called during flush to apply any buffered deletes. If
- // doMerge is true then a new segment was just created and
- // flushed from the ram segments.
- private final void maybeApplyDeletes(boolean doMerge) throws CorruptIndexException, IOException {
+ // flushedNewSegment is true then a new segment was just
+ // created and flushed from the ram segments, so we will
+ // selectively apply the deletes to that new segment.
+ private final void applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
if (bufferedDeleteTerms.size() > 0) {
if (infoStream != null)
infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on "
+ segmentInfos.size() + " segments.");
- if (doMerge) {
+ if (flushedNewSegment) {
IndexReader reader = null;
try {
- reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1));
+ // Open readers w/o opening the stored fields /
+ // vectors because these files may still be held
+ // open for writing by docWriter
+ reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1), false);
// Apply delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
@@ -2018,14 +2281,14 @@
}
int infosEnd = segmentInfos.size();
- if (doMerge) {
+ if (flushedNewSegment) {
infosEnd--;
}
for (int i = 0; i < infosEnd; i++) {
IndexReader reader = null;
try {
- reader = SegmentReader.get(segmentInfos.info(i));
+ reader = SegmentReader.get(segmentInfos.info(i), false);
// Apply delete terms to disk segments
// except the one just flushed from ram.
@@ -2049,7 +2312,9 @@
private final boolean checkNonDecreasingLevels(int start) {
int lowerBound = -1;
- int upperBound = minMergeDocs;
+ int upperBound = docWriter.getMaxBufferedDocs();
+ if (upperBound == 0)
+ upperBound = 10;
for (int i = segmentInfos.size() - 1; i >= start; i--) {
int docCount = segmentInfos.info(i).docCount;
@@ -2098,10 +2363,11 @@
// well as the disk segments.
private void bufferDeleteTerm(Term term) {
Num num = (Num) bufferedDeleteTerms.get(term);
+ int numDoc = docWriter.getNumDocsInRAM();
if (num == null) {
- bufferedDeleteTerms.put(term, new Num(ramSegmentInfos.size()));
+ bufferedDeleteTerms.put(term, new Num(numDoc));
} else {
- num.setNum(ramSegmentInfos.size());
+ num.setNum(numDoc);
}
numBufferedDeleteTerms++;
}
@@ -2115,7 +2381,7 @@
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
Term term = (Term) entry.getKey();
-
+
TermDocs docs = reader.termDocs(term);
if (docs != null) {
int num = ((Num) entry.getValue()).getNum();
Index: src/java/org/apache/lucene/index/IndexFileDeleter.java
===================================================================
--- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 547668)
+++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy)
@@ -97,6 +97,7 @@
private PrintStream infoStream;
private Directory directory;
private IndexDeletionPolicy policy;
+ private DocumentsWriter docWriter;
void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
@@ -116,10 +117,12 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream)
+ public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream, DocumentsWriter docWriter)
throws CorruptIndexException, IOException {
+ this.docWriter = docWriter;
this.infoStream = infoStream;
+
this.policy = policy;
this.directory = directory;
@@ -294,7 +297,7 @@
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
- message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [isCommit = " + isCommit + "]");
+ message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable
@@ -310,6 +313,8 @@
// Incref the files:
incRef(segmentInfos, isCommit);
+ if (docWriter != null)
+ incRef(docWriter.files());
if (isCommit) {
// Append to our commits list:
@@ -324,10 +329,10 @@
// DecRef old files from the last checkpoint, if any:
int size = lastFiles.size();
+
if (size > 0) {
- for(int i=0;i= 0x01 && code <= 0x7F)
+ writeByte((byte)code);
+ else if (((code >= 0x80) && (code <= 0x7FF)) || code == 0) {
+ writeByte((byte)(0xC0 | (code >> 6)));
+ writeByte((byte)(0x80 | (code & 0x3F)));
+ } else {
+ writeByte((byte)(0xE0 | (code >>> 12)));
+ writeByte((byte)(0x80 | ((code >> 6) & 0x3F)));
+ writeByte((byte)(0x80 | (code & 0x3F)));
+ }
+ }
+ }
+
+
/** Forces any buffered output to be written. */
public abstract void flush() throws IOException;