Index: CHANGES.txt
===================================================================
--- CHANGES.txt (revision 573633)
+++ CHANGES.txt (working copy)
@@ -28,6 +28,20 @@
termText instead of String. This gives faster tokenization
performance (~10-15%). (Mike McCandless)
+ 5. LUCENE-847: Factor MergePolicy and MergeScheduler out of
+ IndexWriter. This enables users to change how & when segments are
+ selected for merging (using MergePolicy) as well as how & when
+ those merges are actually performed (using MergeScheduler).
+ (Steven Parkes via Mike McCandless).
+
+ 6. LUCENE-870: Add a ConcurrentMergeScheduler which will execute
+ merges using background threads. (Steven Parkes via Mike McCandless)
+
+ 7. LUCENE-845: Added a LogByteSizeMergePolicy which selects merges by
+ roughly equal size (in bytes) of the segments in the Directory.
+ This is a better match when you flush by RAM usage instead of
+ document count in IndexWriter (Mike McCandless).
+
Bug fixes
1. LUCENE-933: QueryParser fixed to not produce empty sub
Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java
===================================================================
--- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 573633)
+++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy)
@@ -55,7 +55,7 @@
writeBytes(singleByte, 0, 1);
}
- public void writeBytes(byte[] b, int offset, int len) throws IOException {
+ public void writeBytes(byte[] b, int offset, int len) throws IOException {
long freeSpace = dir.maxSize - dir.sizeInBytes();
long realUsage = 0;
Index: src/test/org/apache/lucene/store/MockRAMDirectory.java
===================================================================
--- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 573633)
+++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy)
@@ -195,7 +195,7 @@
* RAMOutputStream.BUFFER_SIZE (now 1024) bytes.
*/
- final long getRecomputedActualSizeInBytes() {
+ final synchronized long getRecomputedActualSizeInBytes() {
long size = 0;
Iterator it = fileMap.values().iterator();
while (it.hasNext())
Index: src/test/org/apache/lucene/index/TestThreadedOptimize.java
===================================================================
--- src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0)
+++ src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0)
@@ -0,0 +1,144 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.SimpleAnalyzer;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.util._TestUtil;
+import org.apache.lucene.util.English;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.io.File;
+
+public class TestThreadedOptimize extends TestCase {
+
+ private static final Analyzer ANALYZER = new SimpleAnalyzer();
+
+ private final static int NUM_THREADS = 3;
+ //private final static int NUM_THREADS = 5;
+
+ private final static int NUM_ITER = 2;
+ //private final static int NUM_ITER = 10;
+
+ private final static int NUM_ITER2 = 2;
+ //private final static int NUM_ITER2 = 5;
+
+ private boolean failed;
+
+ private void setFailed() {
+ failed = true;
+ }
+
+ public void runTest(Directory directory, boolean autoCommit, MergeScheduler merger) throws Exception {
+
+ IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true);
+ writer.setMaxBufferedDocs(2);
+ if (merger != null)
+ writer.setMergeScheduler(merger);
+
+ for(int iter=0;iter 0) {
Index: src/test/org/apache/lucene/index/TestStressIndexing.java
===================================================================
--- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 573633)
+++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy)
@@ -32,105 +32,119 @@
public class TestStressIndexing extends TestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private static final Random RANDOM = new Random();
- private static Searcher SEARCHER;
- private static int RUN_TIME_SEC = 15;
-
- private static class IndexerThread extends Thread {
- IndexWriter modifier;
- int nextID;
- public int count;
+ private static abstract class TimedThread extends Thread {
boolean failed;
+ int count;
+ private static int RUN_TIME_SEC = 6;
+ private TimedThread[] allThreads;
- public IndexerThread(IndexWriter modifier) {
- this.modifier = modifier;
+ abstract public void doWork() throws Throwable;
+
+ TimedThread(TimedThread[] threads) {
+ this.allThreads = threads;
}
public void run() {
- long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
- try {
- while(true) {
+ final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
- if (System.currentTimeMillis() > stopTime) {
- break;
- }
+ count = 0;
- // Add 10 docs:
- for(int j=0; j<10; j++) {
- Document d = new Document();
- int n = RANDOM.nextInt();
- d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
- d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
- modifier.addDocument(d);
- }
-
- // Delete 5 docs:
- int deleteID = nextID;
- for(int j=0; j<5; j++) {
- modifier.deleteDocuments(new Term("id", ""+deleteID));
- deleteID -= 2;
- }
-
+ try {
+ while(System.currentTimeMillis() < stopTime && !anyErrors()) {
+ doWork();
count++;
}
-
- } catch (Exception e) {
- System.out.println(e.toString());
- e.printStackTrace();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
failed = true;
}
}
+
+ private boolean anyErrors() {
+ for(int i=0;i stopTime) {
- break;
- }
- }
- } catch (Exception e) {
- System.out.println(e.toString());
- e.printStackTrace();
- failed = true;
+ public void doWork() throws Exception {
+ // Add 10 docs:
+ for(int j=0; j<10; j++) {
+ Document d = new Document();
+ int n = RANDOM.nextInt();
+ d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
+ writer.addDocument(d);
}
+
+ // Delete 5 docs:
+ int deleteID = nextID-1;
+ for(int j=0; j<5; j++) {
+ writer.deleteDocuments(new Term("id", ""+deleteID));
+ deleteID -= 2;
+ }
}
}
+ private static class SearcherThread extends TimedThread {
+ private Directory directory;
+
+ public SearcherThread(Directory directory, TimedThread[] threads) {
+ super(threads);
+ this.directory = directory;
+ }
+
+ public void doWork() throws Throwable {
+ for (int i=0; i<100; i++)
+ (new IndexSearcher(directory)).close();
+ count += 100;
+ }
+ }
+
/*
Run one indexer and 2 searchers against single index as
stress test.
*/
- public void runStressTest(Directory directory) throws Exception {
- IndexWriter modifier = new IndexWriter(directory, ANALYZER, true);
+ public void runStressTest(Directory directory, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception {
+ IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true);
+ modifier.setMaxBufferedDocs(10);
+
+ TimedThread[] threads = new TimedThread[4];
+
+ if (mergeScheduler != null)
+ modifier.setMergeScheduler(mergeScheduler);
+
// One modifier that writes 10 docs then removes 5, over
// and over:
- IndexerThread indexerThread = new IndexerThread(modifier);
+ IndexerThread indexerThread = new IndexerThread(modifier, threads);
+ threads[0] = indexerThread;
indexerThread.start();
- IndexerThread indexerThread2 = new IndexerThread(modifier);
+ IndexerThread indexerThread2 = new IndexerThread(modifier, threads);
+ threads[2] = indexerThread2;
indexerThread2.start();
- // Two searchers that constantly just re-instantiate the searcher:
- SearcherThread searcherThread1 = new SearcherThread(directory);
+ // Two searchers that constantly just re-instantiate the
+ // searcher:
+ SearcherThread searcherThread1 = new SearcherThread(directory, threads);
+ threads[3] = searcherThread1;
searcherThread1.start();
- SearcherThread searcherThread2 = new SearcherThread(directory);
+ SearcherThread searcherThread2 = new SearcherThread(directory, threads);
+ threads[3] = searcherThread2;
searcherThread2.start();
indexerThread.join();
@@ -144,6 +158,7 @@
assertTrue("hit unexpected exception in indexer2", !indexerThread2.failed);
assertTrue("hit unexpected exception in search1", !searcherThread1.failed);
assertTrue("hit unexpected exception in search2", !searcherThread2.failed);
+
//System.out.println(" Writer: " + indexerThread.count + " iterations");
//System.out.println("Searcher 1: " + searcherThread1.count + " searchers created");
//System.out.println("Searcher 2: " + searcherThread2.count + " searchers created");
@@ -155,25 +170,38 @@
*/
public void testStressIndexAndSearching() throws Exception {
- // First in a RAM directory:
+ // RAMDir
Directory directory = new MockRAMDirectory();
- runStressTest(directory);
+ runStressTest(directory, true, null);
directory.close();
- // Second in an FSDirectory:
+ // FSDir
String tempDir = System.getProperty("java.io.tmpdir");
File dirPath = new File(tempDir, "lucene.test.stress");
directory = FSDirectory.getDirectory(dirPath);
- runStressTest(directory);
+ runStressTest(directory, true, null);
directory.close();
- rmDir(dirPath);
- }
- private void rmDir(File dir) {
- File[] files = dir.listFiles();
- for (int i = 0; i < files.length; i++) {
- files[i].delete();
- }
- dir.delete();
+ // With ConcurrentMergeScheduler, in RAMDir
+ directory = new MockRAMDirectory();
+ runStressTest(directory, true, new ConcurrentMergeScheduler());
+ directory.close();
+
+ // With ConcurrentMergeScheduler, in FSDir
+ directory = FSDirectory.getDirectory(dirPath);
+ runStressTest(directory, true, new ConcurrentMergeScheduler());
+ directory.close();
+
+ // With ConcurrentMergeScheduler and autoCommit=false, in RAMDir
+ directory = new MockRAMDirectory();
+ runStressTest(directory, false, new ConcurrentMergeScheduler());
+ directory.close();
+
+ // With ConcurrentMergeScheduler and autoCommit=false, in FSDir
+ directory = FSDirectory.getDirectory(dirPath);
+ runStressTest(directory, false, new ConcurrentMergeScheduler());
+ directory.close();
+
+ _TestUtil.rmDir(dirPath);
}
}
Index: src/test/org/apache/lucene/index/TestDocumentWriter.java
===================================================================
--- src/test/org/apache/lucene/index/TestDocumentWriter.java (revision 573633)
+++ src/test/org/apache/lucene/index/TestDocumentWriter.java (working copy)
@@ -62,7 +62,7 @@
IndexWriter writer = new IndexWriter(dir, analyzer, true);
writer.addDocument(testDoc);
writer.flush();
- SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
+ SegmentInfo info = writer.newestSegment();
writer.close();
//After adding the document, we should be able to read it back in
SegmentReader reader = SegmentReader.get(info);
@@ -123,7 +123,7 @@
writer.addDocument(doc);
writer.flush();
- SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
+ SegmentInfo info = writer.newestSegment();
writer.close();
SegmentReader reader = SegmentReader.get(info);
@@ -156,7 +156,7 @@
writer.addDocument(doc);
writer.flush();
- SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
+ SegmentInfo info = writer.newestSegment();
writer.close();
SegmentReader reader = SegmentReader.get(info);
Index: src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java
===================================================================
--- src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (revision 573633)
+++ src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (working copy)
@@ -272,7 +272,6 @@
writer.addIndexesNoOptimize(new Directory[] { aux, aux });
assertEquals(1020, writer.docCount());
- assertEquals(2, writer.getSegmentCount());
assertEquals(1000, writer.getDocCount(0));
writer.close();
@@ -373,7 +372,7 @@
writer = newWriter(dir, true);
writer.setMaxBufferedDocs(1000);
- // add 1000 documents
+ // add 1000 documents in 1 segment
addDocs(writer, 1000);
assertEquals(1000, writer.docCount());
assertEquals(1, writer.getSegmentCount());
Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
===================================================================
--- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0)
+++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0)
@@ -0,0 +1,309 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.Directory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/** A {@link MergeScheduler} that does each merge using a
+ * separate thread. This is a simple way to use
+ * concurrency in the indexing process without having to
+ * create external threads. */
+
+public class ConcurrentMergeScheduler implements MergeScheduler {
+
+ public static boolean VERBOSE = false;
+
+ private int mergeThreadPriority = -1;
+ private boolean aborted;
+ private List serialMerges = new ArrayList();
+
+ private List runningMerges = new ArrayList();
+ private int maxThreadCount = 3;
+
+ private List exceptions = new ArrayList();
+
+ /** Sets the max # simultaneous threads that may be
+ * running. If a merge is necessary yet we already have
+ * this many threads running, the merge is returned back
+ * to IndexWriter so that it runs in the "foreground". */
+ public void setMaxNumThreads(int count) {
+ maxThreadCount = count;
+ }
+
+ /** Get the max # simultaneous threads that may be
+ * running. @see #setMaxNumThreads. */
+ public int getMaxNumThreads() {
+ return maxThreadCount;
+ }
+
+ /** Return the priority that merge threads run at. By
+ * default the priority is 1 plus the priority of first
+ * thread that calls merge. */
+ public synchronized int getMergeThreadPriority() {
+ initMergeThreadPriority();
+ return mergeThreadPriority;
+ }
+
+ /** Returns any exceptions that were caught in the merge
+ * threads. */
+ public List getExceptions() {
+ return exceptions;
+ }
+
+ /** Return the priority that merge threads run at. */
+ public synchronized void setMergeThreadPriority(int pri) {
+ mergeThreadPriority = pri;
+
+ final int numThreads = runningMerges.size();
+ for(int i=0;i 0) {
+ if (VERBOSE) {
+ message("now wait for threads; currentnly " + runningMerges.size() + " still running");
+ for(int i=0;i= maxThreadCount) {
+
+ if (VERBOSE)
+ message(" too many background merges running (" + numMergeThreads + "); will run in foreground");
+
+ // We are already running as many threads as
+ // allowed, so, do this one in the foreground
+ i++;
+ } else if (writer.registerMerge(merge)) {
+ MergeThread merger = new MergeThread(writer, merge);
+ if (VERBOSE)
+ message(" launch new thread: merge " + merge.segString(dir));
+ merger.setDaemon(true);
+ merger.start();
+ try {
+ merger.setPriority(mergeThreadPriority);
+ } catch (NullPointerException npe) {
+ // Strangely, Sun's JDK 1.5 on Linux sometimes
+ // throws NPE out of here...
+ }
+ spec.merges.remove(i);
+ } else {
+ if (VERBOSE)
+ message(" already running");
+ spec.merges.remove(i);
+ }
+ }
+ }
+ }
+
+ // Any merges that remain must now be run in the
+ // foreground of this thread
+ final int numMergesLeft = spec.merges.size();
+ for(int j=0;jExpert: a MergePolicy determines the sequence of
+ * primitive merge operations to be used for overall merge
+ * and optimize operations.
+ *
+ * Whenever the segments in an index have been altered by
+ * {@link IndexWriter}, either the addition of a newly
+ * flushed segment, addition of many segments due to
+ * addIndexes* calls, or a previous merge that may now need
+ * to cascade, {@link IndexWriter} will invoke {@link
+ * #findCandidateMerges} to give the MergePolicy a chance to merge
+ * segments. This method returns a {@link
+ * MergeSpecification} instance describing the set of merges
+ * that should be done, or null if no merges are
+ * necessary.
+ *
+ * Note that this means the policy can return more than
+ * one merge as being necessary. In this case, if the
+ * writer is using {@link StandardMerger}, the merges will
+ * be run sequentially.
+ *
+ * The default MergePolicy is {@link
+ * LogByteSizeMergePolicy}.
+ */
+
+public interface MergePolicy {
+
+ /**
+ * A OneMerge instance provides the information necessary
+ * to perform an individual primitive merge operation,
+ * resulting in a single new segment. The merge spec
+ * includes the subset of segments to be merged as well as
+ * whether the new segment should use the compound file
+ * format.
+ */
+
+ public static class OneMerge {
+
+ SegmentInfo info; // used by IndexWriter
+ boolean mergeDocStores; // used by IndexWriter
+ SegmentInfos segmentsClone; // used by IndexWriter
+ boolean increfDone; // used by IndexWriter
+ boolean registerDone; // used by IndexWriter
+ int startFullRefreshCount; // used by IndexWriter
+
+ final SegmentInfos segments;
+ final boolean useCompoundFile;
+ boolean aborted;
+
+ public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
+ this.segments = segments;
+ this.useCompoundFile = useCompoundFile;
+ }
+
+ /** Mark this merge as aborted. If this is called
+ * before the merge is committed then the merge will
+ * not be committed. */
+ public synchronized void abort() {
+ aborted = true;
+ }
+ /** Returns true if this merge was aborted. */
+ public synchronized boolean isAborted() {
+ return aborted;
+ }
+
+ public String segString(Directory dir) {
+ StringBuffer b = new StringBuffer();
+ final int numSegments = segments.size();
+ for(int i=0;i 0) b.append(" ");
+ b.append(segments.info(i).segString(dir));
+ }
+ if (info != null)
+ b.append(" into " + info.name);
+ return b.toString();
+ }
+ }
+
+ /**
+ * A MergeSpecification instance provides the information
+ * necessary to perform multiple merges. It simply
+ * contains a list of {@link OneMerge} instances.
+ */
+
+ public static class MergeSpecification implements Cloneable {
+
+ /**
+ * The subset of segments to be included in the primitive merge.
+ */
+
+ public List merges = new ArrayList();
+
+ public void add(OneMerge merge) {
+ merges.add(merge);
+ }
+
+ public String segString(Directory dir) {
+ StringBuffer b = new StringBuffer();
+ b.append("MergeSpec:\n");
+ final int count = merges.size();
+ for(int i=0;i maxNumSegments ||
+ (infos.size() == 1 &&
+ (infos.info(0).hasDeletions() ||
+ infos.info(0).hasSeparateNorms() ||
+ infos.info(0).dir != writer.getDirectory() ||
+ infos.info(0).getUseCompoundFile() != useCompoundFile)));
+ }
+
+ public MergeSpecification findMergesForOptimize(SegmentInfos infos, IndexWriter writer, int maxNumSegments) throws IOException {
+ final MergeSpecification spec;
+ if (!isOptimized(infos, writer, maxNumSegments)) {
+ spec = new MergeSpecification();
+ final int numSegments = infos.size();
+ final int first;
+ if (numSegments > mergeFactor)
+ first = numSegments-mergeFactor;
+ else
+ first = 0;
+ spec.add(new OneMerge(infos.range(first, numSegments), useCompoundFile));
+ } else
+ spec = null;
+ return spec;
+ }
+
+ public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException {
+
+ final int numSegments = infos.size();
+
+ // Compute levels, which is just log (base mergeFactor)
+ // of the size of each segment
+ float[] levels = new float[numSegments];
+ final float norm = (float) Math.log(mergeFactor);
+
+ float levelFloor = (float) (Math.log(minMergeSize)/norm);
+ final Directory directory = writer.getDirectory();
+
+ for(int i=0;i= maxMergeSize && info.dir != directory)
+ throw new IllegalArgumentException("Segment is too large (" + size + " vs max size " + maxMergeSize + ")");
+
+ // Floor tiny segments @ mergeFactor
+ if (size < mergeFactor)
+ size = mergeFactor;
+ levels[i] = (float) Math.log(size)/norm;
+ }
+
+ // Now, we quantize the log values into levels. The
+ // first level is any segment whose log size is within
+ // LEVEL_LOG_SPAN of the max size, or, who has such as
+ // segment "to the right". Then, we find the max of all
+ // other segments and use that to define the next level
+ // segment, etc.
+
+ MergeSpecification spec = null;
+
+ int start = 0;
+ while(start < numSegments) {
+
+ // Find max level of all segments not already
+ // quantized.
+ float maxLevel = levels[start];
+ for(int i=1+start;i maxLevel)
+ maxLevel = level;
+ }
+
+ // Now search backwards for the rightmost segment that
+ // falls into this level:
+ float levelBottom;
+ if (maxLevel < levelFloor)
+ // All remaining segments fall into the min level
+ levelBottom = -1.0F;
+ else {
+ levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN);
+
+ // Force a boundary at the level floor
+ if (levelBottom < levelFloor && maxLevel >= levelFloor)
+ levelBottom = levelFloor;
+ }
+
+ int upto = numSegments-1;
+ while(upto >= start) {
+ if (levels[upto] >= levelBottom)
+ break;
+ upto--;
+ }
+
+ // Finally, record all merges that are viable at this level:
+ int end = start + mergeFactor;
+ while(end <= 1+upto) {
+ boolean anyTooLarge = false;
+ for(int i=start;i= maxMergeSize;
+
+ if (!anyTooLarge) {
+ if (spec == null)
+ spec = new MergeSpecification();
+ spec.add(new OneMerge(infos.range(start, end), useCompoundFile));
+ }
+ start = end;
+ end = start + mergeFactor;
+ }
+
+ start = 1+upto;
+ }
+
+ return spec;
+ }
+}
Property changes on: src/java/org/apache/lucene/index/LogMergePolicy.java
___________________________________________________________________
Name: svn:eol-style
+ native
Index: src/java/org/apache/lucene/index/SerialMergeScheduler.java
===================================================================
--- src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 0)
+++ src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 0)
@@ -0,0 +1,39 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/** A {@link MergeScheduler} that simply does each merge
+ * sequentially, using the current thread. This is the
+ * default for {@link IndexWriter}. */
+public class SerialMergeScheduler implements MergeScheduler {
+
+ public void merge(IndexWriter writer, MergePolicy.MergeSpecification spec)
+ throws CorruptIndexException, IOException {
+
+ final int numMerge = spec.merges.size();
+ for(int i=0;i
+ also trigger one or more segment merges which by default
+ run (blocking) with the current thread (see below for changing the {@link
+ MergeScheduler}).
The optional autoCommit argument to the
@@ -135,8 +141,20 @@
filesystems like NFS that do not support "delete on last
close" semantics, which Lucene's "point in time" search
normally relies on.
- */
+
+ Expert: IndexWriter allows you to
+ separately change the {@link MergePolicy} and the {@link
+ MergeScheduler}. The {@link MergePolicy} is invoked
+ whenever there are changes to the segments in the index.
+ Its role is to select which merges to do, if any, and
+ return a {@link MergePolicy.MergeSpecification} describing
+ the merges. Default is {@link LogDocMergePolicy}. Then,
+ the {@link MergeScheduler} is invoked with the requested
+ merges and it decides when and how to run the merges.
+ Default is {@link SerialMergeScheduler}.
+*/
+
/*
* Clarification: Check Points (and commits)
* Being able to set autoCommit=false allows IndexWriter to flush and
@@ -177,9 +195,10 @@
public static final String WRITE_LOCK_NAME = "write.lock";
/**
- * Default value is 10. Change using {@link #setMergeFactor(int)}.
+ * @deprecated
+ * @see LogMergePolicy#DEFAULT_MERGE_FACTOR
*/
- public final static int DEFAULT_MERGE_FACTOR = 10;
+ public final static int DEFAULT_MERGE_FACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR;
/**
* Default value is 10. Change using {@link #setMaxBufferedDocs(int)}.
@@ -205,9 +224,10 @@
public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000;
/**
- * Default value is {@link Integer#MAX_VALUE}. Change using {@link #setMaxMergeDocs(int)}.
+ * @deprecated
+ * @see: LogMergePolicy.DEFAULT_MAX_MERGE_DOCS
*/
- public final static int DEFAULT_MAX_MERGE_DOCS = Integer.MAX_VALUE;
+ public final static int DEFAULT_MAX_MERGE_DOCS = LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS;
/**
* Default value is 10,000. Change using {@link #setMaxFieldLength(int)}.
@@ -239,7 +259,7 @@
private boolean localAutoCommit; // saved autoCommit during local transaction
private boolean autoCommit = true; // false if we should commit only on close
- SegmentInfos segmentInfos = new SegmentInfos(); // the segments
+ private SegmentInfos segmentInfos = new SegmentInfos(); // the segments
private DocumentsWriter docWriter;
private IndexFileDeleter deleter;
@@ -257,15 +277,14 @@
private HashMap bufferedDeleteTerms = new HashMap();
private int numBufferedDeleteTerms = 0;
- /** Use compound file setting. Defaults to true, minimizing the number of
- * files used. Setting this to false may improve indexing performance, but
- * may also cause file handle problems.
- */
- private boolean useCompoundFile = true;
-
private boolean closeDir;
private boolean closed;
+ private boolean closing;
+ private MergePolicy mergePolicy = new LogDocMergePolicy();
+ private MergeScheduler mergeScheduler = new SerialMergeScheduler();
+
+
/**
* Used internally to throw an {@link
* AlreadyClosedException} if this IndexWriter has been
@@ -278,23 +297,53 @@
}
}
- /** Get the current setting of whether to use the compound file format.
- * Note that this just returns the value you set with setUseCompoundFile(boolean)
- * or the default. You cannot use this to query the status of an existing index.
+ /**
+ * Casts current mergePolicy to LogMergePolicy, and throws
+ * an exception if the mergePolicy is not a LogMergePolicy.
+ */
+ private LogMergePolicy getLogMergePolicy() {
+ if (mergePolicy instanceof LogMergePolicy)
+ return (LogMergePolicy) mergePolicy;
+ else
+ throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy");
+ }
+
+ private LogDocMergePolicy getLogDocMergePolicy() {
+ if (mergePolicy instanceof LogDocMergePolicy)
+ return (LogDocMergePolicy) mergePolicy;
+ else
+ throw new IllegalArgumentException("this method can only be called when the merge policy is LogDocMergePolicy");
+ }
+
+ /** Get the current setting of whether newly flushed
+ * segments will use the compound file format. Note that
+ * this just returns the value previously set with
+ * setUseCompoundFile(boolean), or the default value
+ * (true). You cannot use this to query the status of
+ * previously flushed segments.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.getUseCompoundFile as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* @see #setUseCompoundFile(boolean)
*/
public boolean getUseCompoundFile() {
- ensureOpen();
- return useCompoundFile;
+ return getLogMergePolicy().getUseCompoundFile();
}
- /** Setting to turn on usage of a compound file. When on, multiple files
- * for each segment are merged into a single file once the segment creation
- * is finished. This is done regardless of what directory is in use.
+ /** Setting to turn on usage of a compound file. When on,
+ * multiple files for each segment are merged into a
+ * single file when a new segment is flushed.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.setUseCompoundFile as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
*/
public void setUseCompoundFile(boolean value) {
- ensureOpen();
- useCompoundFile = value;
+ getLogMergePolicy().setUseCompoundFile(value);
}
/** Expert: Set the Similarity implementation used by this IndexWriter.
@@ -652,26 +701,79 @@
}
}
+ /**
+ * Set the merge policy used by this IndexWriter
+ */
+ public void setMergePolicy(MergePolicy mp) {
+ ensureOpen();
+ if (mp == null)
+ throw new NullPointerException("MergePolicy must be non-null");
+
+ if (mergePolicy != mp)
+ mergePolicy.close();
+ mergePolicy = mp;
+ }
+
+ /**
+ * Returns the current MergePolicy in use by this writer.
+ * @see #setMergePolicy
+ */
+ public MergePolicy getMergePolicy() {
+ ensureOpen();
+ return mergePolicy;
+ }
+
+ /**
+ * Set the merger used by this IndexWriter
+ */
+ public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException {
+ ensureOpen();
+ if (mergeScheduler == null)
+ throw new NullPointerException("MergeScheduler must be non-null");
+
+ if (this.mergeScheduler != mergeScheduler)
+ this.mergeScheduler.close();
+ this.mergeScheduler = mergeScheduler;
+ }
+
+ /**
+ * Returns the current MergePolicy in use by this writer.
+ * @see #setMergePolicy
+ */
+ public MergeScheduler getMergeScheduler() {
+ ensureOpen();
+ return mergeScheduler;
+ }
+
/** Determines the largest number of documents ever merged by addDocument().
* Small values (e.g., less than 10,000) are best for interactive indexing,
* as this limits the length of pauses while indexing to a few seconds.
* Larger values are best for batched indexing and speedier searches.
*
* The default value is {@link Integer#MAX_VALUE}.
+ *
+ *
Note that this method is a convenience method: it
+ * just calls mergePolicy.setMaxMergeDocs as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
*/
public void setMaxMergeDocs(int maxMergeDocs) {
- ensureOpen();
- this.maxMergeDocs = maxMergeDocs;
+ getLogDocMergePolicy().setMaxMergeDocs(maxMergeDocs);
}
- /**
+ /**
* Returns the largest number of documents allowed in a
* single segment.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.getMaxMergeDocs as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* @see #setMaxMergeDocs
*/
public int getMaxMergeDocs() {
- ensureOpen();
- return maxMergeDocs;
+ return getLogDocMergePolicy().getMaxMergeDocs();
}
/**
@@ -796,24 +898,31 @@
* for batch index creation, and smaller values (< 10) for indices that are
* interactively maintained.
*
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.setMergeFactor as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* This must never be less than 2. The default value is 10.
*/
public void setMergeFactor(int mergeFactor) {
- ensureOpen();
- if (mergeFactor < 2)
- throw new IllegalArgumentException("mergeFactor cannot be less than 2");
- this.mergeFactor = mergeFactor;
+ getLogMergePolicy().setMergeFactor(mergeFactor);
}
/**
- * Returns the number of segments that are merged at once
- * and also controls the total number of segments allowed
- * to accumulate in the index.
+ *
Returns the number of segments that are merged at
+ * once and also controls the total number of segments
+ * allowed to accumulate in the index.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.getMergeFactor as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* @see #setMergeFactor
*/
public int getMergeFactor() {
- ensureOpen();
- return mergeFactor;
+ return getLogMergePolicy().getMergeFactor();
}
/** If non-null, this will be the default infoStream used
@@ -922,37 +1031,66 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void close() throws CorruptIndexException, IOException {
- if (!closed) {
- flush(true, true);
+ public void close() throws CorruptIndexException, IOException {
- if (commitPending) {
- segmentInfos.write(directory); // now commit changes
- if (infoStream != null)
- infoStream.println("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
- deleter.checkpoint(segmentInfos, true);
- commitPending = false;
- rollbackSegmentInfos = null;
- }
+ boolean doClose;
+ synchronized(this) {
+ // Ensure that only one thread actually gets to do the closing:
+ if (!closing) {
+ doClose = true;
+ closing = true;
+ } else
+ doClose = false;
+ }
- if (writeLock != null) {
- writeLock.release(); // release write lock
- writeLock = null;
+ if (doClose) {
+ try {
+ flush(true, true);
+
+ mergePolicy.close();
+ mergeScheduler.close();
+
+ if (commitPending) {
+ segmentInfos.write(directory); // now commit changes
+ if (infoStream != null)
+ infoStream.println("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
+ synchronized(this) {
+ deleter.checkpoint(segmentInfos, true);
+ }
+ commitPending = false;
+ rollbackSegmentInfos = null;
+ }
+
+ if (writeLock != null) {
+ writeLock.release(); // release write lock
+ writeLock = null;
+ }
+ closed = true;
+ docWriter = null;
+
+ synchronized(this) {
+ deleter.close();
+ }
+
+ if(closeDir)
+ directory.close();
+ } finally {
+ if (!closed)
+ closing = false;
}
- closed = true;
- docWriter = null;
-
- if(closeDir)
- directory.close();
}
}
/** Tells the docWriter to close its currently open shared
- * doc stores (stored fields & vectors files). */
- private void flushDocStores() throws IOException {
+ * doc stores (stored fields & vectors files).
+ * Return value specifices whether new doc store files are compound or not.
+ */
+ private synchronized boolean flushDocStores() throws IOException {
List files = docWriter.files();
+ boolean useCompoundDocStore = false;
+
if (files.size() > 0) {
String docStoreSegment;
@@ -965,16 +1103,18 @@
docWriter.abort();
}
- if (useCompoundFile && docStoreSegment != null) {
+ useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos);
+
+ if (useCompoundDocStore && docStoreSegment != null) {
// Now build compound doc store file
- checkpoint();
success = false;
final int numSegments = segmentInfos.size();
+ final String compoundFileName = docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION;
try {
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
final int size = files.size();
for(int i=0;i
*
- * The amount of free space required when a merge is
- * triggered is up to 1X the size of all segments being
- * merged, when no readers/searchers are open against the
- * index, and up to 2X the size of all segments being
- * merged when readers/searchers are open against the
- * index (see {@link #optimize()} for details). Most
- * merges are small (merging the smallest segments
- * together), but whenever a full merge occurs (all
- * segments in the index, which is the worst case for
- * temporary space usage) then the maximum free disk space
- * required is the same as {@link #optimize}.
+ * The amount of free space required when a merge is triggered is
+ * up to 1X the size of all segments being merged, when no
+ * readers/searchers are open against the index, and up to 2X the
+ * size of all segments being merged when readers/searchers are open
+ * against the index (see {@link #optimize()} for details). The
+ * sequence of primitive merge operations performed is governed by
+ * the merge policy.
*
*
Note that each term in the document can be no longer
* than 16383 characters, otherwise an
@@ -1117,14 +1257,27 @@
*/
public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
ensureOpen();
+ boolean doFlush = false;
boolean success = false;
try {
- success = docWriter.addDocument(doc, analyzer);
- } catch (IOException ioe) {
- deleter.refresh();
- throw ioe;
+ doFlush = docWriter.addDocument(doc, analyzer);
+ success = true;
+ } finally {
+ if (!success) {
+ synchronized (this) {
+ bufferedDeleteTerms.clear();
+ numBufferedDeleteTerms = 0;
+
+ // If docWriter has some aborted files, then we
+ // clean them up here
+ final List files = docWriter.abortedFiles();
+ if (files != null) {
+ deleter.deleteFiles(files);
+ }
+ }
+ }
}
- if (success)
+ if (doFlush)
flush(true, false);
}
@@ -1134,9 +1287,11 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException {
+ public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
- bufferDeleteTerm(term);
+ synchronized(this) {
+ bufferDeleteTerm(term);
+ }
maybeFlush();
}
@@ -1148,10 +1303,12 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException {
+ public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException {
ensureOpen();
- for (int i = 0; i < terms.length; i++) {
- bufferDeleteTerm(terms[i]);
+ synchronized(this) {
+ for (int i = 0; i < terms.length; i++) {
+ bufferDeleteTerm(terms[i]);
+ }
}
maybeFlush();
}
@@ -1192,14 +1349,26 @@
synchronized (this) {
bufferDeleteTerm(term);
}
+ boolean doFlush = false;
boolean success = false;
try {
- success = docWriter.addDocument(doc, analyzer);
- } catch (IOException ioe) {
- deleter.refresh();
- throw ioe;
+ doFlush = docWriter.addDocument(doc, analyzer);
+ success = true;
+ } finally {
+ if (!success) {
+ synchronized (this) {
+ bufferedDeleteTerms.clear();
+ numBufferedDeleteTerms = 0;
+
+ // If docWriter has some aborted files, then we
+ // clean them up here
+ final List files = docWriter.abortedFiles();
+ if (files != null)
+ deleter.deleteFiles(files);
+ }
+ }
}
- if (success)
+ if (doFlush)
flush(true, false);
else
maybeFlush();
@@ -1228,51 +1397,33 @@
return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
}
- /** Determines how often segment indices are merged by addDocument(). With
- * smaller values, less RAM is used while indexing, and searches on
- * unoptimized indices are faster, but indexing speed is slower. With larger
- * values, more RAM is used during indexing, and while searches on unoptimized
- * indices are slower, indexing is faster. Thus larger values (> 10) are best
- * for batch index creation, and smaller values (< 10) for indices that are
- * interactively maintained.
- *
- *
This must never be less than 2. The default value is {@link #DEFAULT_MERGE_FACTOR}.
-
- */
- private int mergeFactor = DEFAULT_MERGE_FACTOR;
-
/** Determines amount of RAM usage by the buffered docs at
* which point we trigger a flush to the index.
*/
private double ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F;
- /** Determines the largest number of documents ever merged by addDocument().
- * Small values (e.g., less than 10,000) are best for interactive indexing,
- * as this limits the length of pauses while indexing to a few seconds.
- * Larger values are best for batched indexing and speedier searches.
- *
- *
The default value is {@link #DEFAULT_MAX_MERGE_DOCS}.
-
- */
- private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
-
/** If non-null, information about merges will be printed to this.
*/
private PrintStream infoStream = null;
-
private static PrintStream defaultInfoStream = null;
- /** Merges all segments together into a single segment,
- * optimizing an index for search.
+ /**
+ * Requests an "optimize" operation on an index, priming the index
+ * for the fastest available search. Traditionally this has meant
+ * merging all segments into a single segment as is done in the
+ * default merge policy, but individaul merge policies may implement
+ * optimize in different ways.
*
+ * @see LogDocMergePolicy#optimize(SegmentInfos)
+ *
*
It is recommended that this method be called upon completion of indexing. In
* environments with frequent updates, optimize is best done during low volume times, if at all.
*
*
* See http://www.gossamer-threads.com/lists/lucene/java-dev/47895 for more discussion.
*
- * Note that this requires substantial temporary free
+ *
Note that this can require substantial temporary free
* space in the Directory (see LUCENE-764
* for details):
@@ -1310,7 +1461,7 @@
* The actual temporary usage could be much less than
* these figures (it depends on many factors).
*
- * Once the optimize completes, the total size of the
+ *
In general, once the optimize completes, the total size of the
* index will be less than the size of the starting index.
* It could be quite a bit smaller (if there were many
* pending deletes) or just slightly smaller.
@@ -1327,21 +1478,48 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void optimize() throws CorruptIndexException, IOException {
+ public void optimize() throws CorruptIndexException, IOException {
ensureOpen();
flush();
- while (segmentInfos.size() > 1 ||
- (segmentInfos.size() == 1 &&
- (SegmentReader.hasDeletions(segmentInfos.info(0)) ||
- SegmentReader.hasSeparateNorms(segmentInfos.info(0)) ||
- segmentInfos.info(0).dir != directory ||
- (useCompoundFile &&
- !segmentInfos.info(0).getUseCompoundFile())))) {
- int minSegment = segmentInfos.size() - mergeFactor;
- mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size());
+
+ // Currently hardwired to 1, but once we add method to
+ // IndexWriter to allow "optimizing to <= N segments"
+ // then we will change this.
+ final int maxSegmentCount = 1;
+
+ // Repeat until merge policy stops returning merges:
+ while(true) {
+ MergePolicy.MergeSpecification spec;
+ synchronized(this) {
+ spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxSegmentCount);
+ }
+ if (spec != null)
+ mergeScheduler.merge(this, spec);
+ else
+ break;
}
}
+ /**
+ * Expert: asks the mergePolicy whether any merges are
+ * necessary now and if so, runs the requested merges and
+ * then iterate (test again if merges are needed) until no
+ * more merges are returned by the mergePolicy.
+ *
+ * Explicit calls to maybeMerge() are usually not
+ * necessary. The most common case is when merge policy
+ * parameters have changed.
+ */
+
+ private final void maybeMerge() throws CorruptIndexException, IOException {
+ final MergePolicy.MergeSpecification spec;
+ synchronized(this) {
+ spec = mergePolicy.findMerges(segmentInfos, this);
+ }
+ if (spec != null)
+ mergeScheduler.merge(this, spec);
+ }
+
/*
* Begin a transaction. During a transaction, any segment
* merges that happen (or ram segments flushed) will not
@@ -1455,18 +1633,19 @@
segmentInfos.clear();
segmentInfos.addAll(rollbackSegmentInfos);
+ // discard any buffered delete terms so they aren't applied in close()
+ bufferedDeleteTerms.clear();
+ numBufferedDeleteTerms = 0;
+
docWriter.abort();
// Ask deleter to locate unreferenced files & remove
// them:
deleter.checkpoint(segmentInfos, false);
+
deleter.refresh();
- bufferedDeleteTerms.clear();
- numBufferedDeleteTerms = 0;
-
commitPending = false;
- docWriter.abort();
close();
} else {
@@ -1481,7 +1660,7 @@
* commit the change immediately. Else, we mark
* commitPending.
*/
- private void checkpoint() throws IOException {
+ private synchronized void checkpoint() throws IOException {
if (autoCommit) {
segmentInfos.write(directory);
if (infoStream != null)
@@ -1541,7 +1720,7 @@
throws CorruptIndexException, IOException {
ensureOpen();
- optimize(); // start with zero or 1 seg
+ flush();
int start = segmentInfos.size();
@@ -1558,15 +1737,8 @@
}
}
- // merge newly added segments in log(n) passes
- while (segmentInfos.size() > start+mergeFactor) {
- for (int base = start; base < segmentInfos.size(); base++) {
- int end = Math.min(segmentInfos.size(), base+mergeFactor);
- if (end-base > 1) {
- mergeSegments(base, end);
- }
- }
- }
+ optimize();
+
success = true;
} finally {
if (success) {
@@ -1575,8 +1747,6 @@
rollbackTransaction();
}
}
-
- optimize(); // final cleanup
}
/**
@@ -1598,40 +1768,10 @@
*/
public synchronized void addIndexesNoOptimize(Directory[] dirs)
throws CorruptIndexException, IOException {
- // Adding indexes can be viewed as adding a sequence of segments S to
- // a sequence of segments T. Segments in T follow the invariants but
- // segments in S may not since they could come from multiple indexes.
- // Here is the merge algorithm for addIndexesNoOptimize():
- //
- // 1 Flush ram.
- // 2 Consider a combined sequence with segments from T followed
- // by segments from S (same as current addIndexes(Directory[])).
- // 3 Assume the highest level for segments in S is h. Call
- // maybeMergeSegments(), but instead of starting w/ lowerBound = -1
- // and upperBound = maxBufferedDocs, start w/ lowerBound = -1 and
- // upperBound = upperBound of level h. After this, the invariants
- // are guaranteed except for the last < M segments whose levels <= h.
- // 4 If the invariants hold for the last < M segments whose levels <= h,
- // if some of those < M segments are from S (not merged in step 3),
- // properly copy them over*, otherwise done.
- // Otherwise, simply merge those segments. If the merge results in
- // a segment of level <= h, done. Otherwise, it's of level h+1 and call
- // maybeMergeSegments() starting w/ upperBound = upperBound of level h+1.
- //
- // * Ideally, we want to simply copy a segment. However, directory does
- // not support copy yet. In addition, source may use compound file or not
- // and target may use compound file or not. So we use mergeSegments() to
- // copy a segment, which may cause doc count to change because deleted
- // docs are garbage collected.
- // 1 flush ram
-
ensureOpen();
flush();
- // 2 copy segment infos and find the highest level from dirs
- int startUpperBound = docWriter.getMaxBufferedDocs();
-
/* new merge policy
if (startUpperBound == 0)
startUpperBound = 10;
@@ -1654,64 +1794,20 @@
for (int j = 0; j < sis.size(); j++) {
SegmentInfo info = sis.info(j);
segmentInfos.addElement(info); // add each info
-
- while (startUpperBound < info.docCount) {
- startUpperBound *= mergeFactor; // find the highest level from dirs
- if (startUpperBound > maxMergeDocs) {
- // upper bound cannot exceed maxMergeDocs
- throw new IllegalArgumentException("Upper bound cannot exceed maxMergeDocs");
- }
- }
}
}
- // 3 maybe merge segments starting from the highest level from dirs
- maybeMergeSegments(startUpperBound);
+ maybeMerge();
- // get the tail segments whose levels <= h
- int segmentCount = segmentInfos.size();
- int numTailSegments = 0;
- while (numTailSegments < segmentCount
- && startUpperBound >= segmentInfos.info(segmentCount - 1 - numTailSegments).docCount) {
- numTailSegments++;
- }
- if (numTailSegments == 0) {
- success = true;
- return;
- }
+ // If after merging there remain segments in the index
+ // that are in a different directory, just copy these
+ // over into our index. This is necessary (before
+ // finishing the transaction) to avoid leaving the
+ // index in an unusable (inconsistent) state.
+ copyExternalSegments();
- // 4 make sure invariants hold for the tail segments whose levels <= h
- if (checkNonDecreasingLevels(segmentCount - numTailSegments)) {
- // identify the segments from S to be copied (not merged in 3)
- int numSegmentsToCopy = 0;
- while (numSegmentsToCopy < segmentCount
- && directory != segmentInfos.info(segmentCount - 1 - numSegmentsToCopy).dir) {
- numSegmentsToCopy++;
- }
- if (numSegmentsToCopy == 0) {
- success = true;
- return;
- }
+ success = true;
- // copy those segments from S
- for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) {
- mergeSegments(i, i + 1);
- }
- if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) {
- success = true;
- return;
- }
- }
-
- // invariants do not hold, simply merge those segments
- mergeSegments(segmentCount - numTailSegments, segmentCount);
-
- // maybe merge segments again if necessary
- if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) {
- maybeMergeSegments(startUpperBound * mergeFactor);
- }
-
- success = true;
} finally {
if (success) {
commitTransaction();
@@ -1721,6 +1817,30 @@
}
}
+ /* If any of our segments are using a directory != ours
+ * then copy them over */
+ private synchronized void copyExternalSegments() throws CorruptIndexException, IOException {
+ final int numSegments = segmentInfos.size();
+ for(int i=0;iAfter this completes, the index is optimized.
* The provided IndexReaders are not closed.
@@ -1785,7 +1905,7 @@
}
}
- if (useCompoundFile) {
+ if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) {
boolean success = false;
@@ -1804,40 +1924,6 @@
}
}
- // Overview of merge policy:
- //
- // A flush is triggered either by close() or by the number of ram segments
- // reaching maxBufferedDocs. After a disk segment is created by the flush,
- // further merges may be triggered.
- //
- // LowerBound and upperBound set the limits on the doc count of a segment
- // which may be merged. Initially, lowerBound is set to 0 and upperBound
- // to maxBufferedDocs. Starting from the rightmost* segment whose doc count
- // > lowerBound and <= upperBound, count the number of consecutive segments
- // whose doc count <= upperBound.
- //
- // Case 1: number of worthy segments < mergeFactor, no merge, done.
- // Case 2: number of worthy segments == mergeFactor, merge these segments.
- // If the doc count of the merged segment <= upperBound, done.
- // Otherwise, set lowerBound to upperBound, and multiply upperBound
- // by mergeFactor, go through the process again.
- // Case 3: number of worthy segments > mergeFactor (in the case mergeFactor
- // M changes), merge the leftmost* M segments. If the doc count of
- // the merged segment <= upperBound, consider the merged segment for
- // further merges on this same level. Merge the now leftmost* M
- // segments, and so on, until number of worthy segments < mergeFactor.
- // If the doc count of all the merged segments <= upperBound, done.
- // Otherwise, set lowerBound to upperBound, and multiply upperBound
- // by mergeFactor, go through the process again.
- // Note that case 2 can be considerd as a special case of case 3.
- //
- // This merge policy guarantees two invariants if M does not change and
- // segment doc count is not reaching maxMergeDocs:
- // B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B)))
- // 1: If i (left*) and i+1 (right*) are two consecutive segments of doc
- // counts x and y, then f(x) >= f(y).
- // 2: The number of committed segments on the same level (f(n)) <= M.
-
// This is called after pending added and deleted
// documents have been flushed to the Directory but before
// the change is committed (new segments_N file written).
@@ -1850,12 +1936,16 @@
* buffered added documents or buffered deleted terms are
* large enough.
*/
- protected final synchronized void maybeFlush() throws CorruptIndexException, IOException {
+ protected final void maybeFlush() throws CorruptIndexException, IOException {
// We only check for flush due to number of buffered
// delete terms, because triggering of a flush due to
// too many added documents is handled by
// DocumentsWriter
- if (numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending())
+ boolean doFlush;
+ synchronized(this) {
+ doFlush = numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending();
+ }
+ if (doFlush)
flush(true, false);
}
@@ -1867,7 +1957,7 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public final synchronized void flush() throws CorruptIndexException, IOException {
+ public final void flush() throws CorruptIndexException, IOException {
flush(true, false);
}
@@ -1879,9 +1969,14 @@
* @param flushDocStores if false we are allowed to keep
* doc stores open to share with the next segment
*/
- protected final synchronized void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException {
+ protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException {
ensureOpen();
+ if (doFlush(flushDocStores) && triggerMerge)
+ maybeMerge();
+ }
+ private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException {
+
// Make sure no threads are actively adding a document
docWriter.pauseAllThreads();
@@ -1911,10 +2006,14 @@
boolean flushDeletes = bufferedDeleteTerms.size() > 0;
if (infoStream != null)
- infoStream.println(" flush: flushDocs=" + flushDocs +
+ infoStream.println(" flush: segment=" + docWriter.getSegment() +
+ " docStoreSegment=" + docWriter.getDocStoreSegment() +
+ " docStoreOffset=" + docWriter.getDocStoreOffset() +
+ " flushDocs=" + flushDocs +
" flushDeletes=" + flushDeletes +
" flushDocStores=" + flushDocStores +
- " numDocs=" + numDocs);
+ " numDocs=" + numDocs +
+ " numBufDelTerms=" + numBufferedDeleteTerms);
int docStoreOffset = docWriter.getDocStoreOffset();
boolean docStoreIsCompoundFile = false;
@@ -1927,13 +2026,15 @@
if (infoStream != null)
infoStream.println(" flush shared docStore segment " + docStoreSegment);
- flushDocStores();
+ docStoreIsCompoundFile = flushDocStores();
flushDocStores = false;
- docStoreIsCompoundFile = useCompoundFile;
}
String segment = docWriter.getSegment();
+ // If we are flushing docs, segment must not be null:
+ assert segment != null || !flushDocs;
+
if (flushDocs || flushDeletes) {
SegmentInfos rollback = null;
@@ -1985,7 +2086,19 @@
success = true;
} finally {
if (!success) {
+
if (flushDeletes) {
+
+ // Carefully check if any partial .del files
+ // should be removed:
+ final int size = rollback.size();
+ for(int i=0;i= 0) {
- SegmentInfo si = segmentInfos.info(minSegment);
+ final int numSegments = segmentInfos.size();
+
+ final int numSegmentsToMerge = merge.segments.size();
+ for(int i=0;i lowerBound && si.docCount <= upperBound) {
- // start from the rightmost* segment whose doc count is in bounds
- maxSegment = minSegment;
- } else if (si.docCount > upperBound) {
- // until the segment whose doc count exceeds upperBound
- break;
- }
+ if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) {
+ if (segmentInfos.indexOf(info) == -1)
+ throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index");
+ else
+ throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle");
}
+ }
- minSegment++;
- maxSegment++;
- int numSegments = maxSegment - minSegment;
+ return first;
+ }
- if (numSegments < mergeFactor) {
- break;
- } else {
- boolean exceedsUpperLimit = false;
+ /* FIXME if we want to support non-contiguous segment merges */
+ synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException {
- // number of merge-worthy segments may exceed mergeFactor when
- // mergeFactor and/or maxBufferedDocs change(s)
- while (numSegments >= mergeFactor) {
- // merge the leftmost* mergeFactor segments
+ // If merge was explicitly aborted, or, if abort() or
+ // rollbackTransaction() had been called since our merge
+ // started, we abort this merge
+ if (merge.isAborted() || deleter.getFullRefreshCount() != merge.startFullRefreshCount) {
- int docCount = mergeSegments(minSegment, minSegment + mergeFactor);
- numSegments -= mergeFactor;
+ if (infoStream != null) {
+ if (merge.isAborted())
+ infoStream.println("commitMerge: skipping merge " + merge.segString(directory) + ": it was aborted");
+ if (deleter.getFullRefreshCount() != merge.startFullRefreshCount)
+ infoStream.println("commitMerge: skipping merge " + merge.segString(directory) + ": deleter.refresh() was called during this merge");
+ }
- if (docCount > upperBound) {
- // continue to merge the rest of the worthy segments on this level
- minSegment++;
- exceedsUpperLimit = true;
- } else {
- // if the merged segment does not exceed upperBound, consider
- // this segment for further merges on this same level
- numSegments++;
+ assert merge.increfDone;
+ decrefMergeSegments(merge);
+ deleter.refresh(merge.info.name);
+ return false;
+ }
+
+ boolean success = false;
+
+ int start;
+
+ try {
+ SegmentInfos sourceSegmentsClone = merge.segmentsClone;
+ SegmentInfos sourceSegments = merge.segments;
+ final int numSegments = segmentInfos.size();
+
+ start = ensureContiguousMerge(merge);
+ if (infoStream != null)
+ infoStream.println("commitMerge " + merge.segString(directory));
+
+ // Carefully merge deletes that occurred after we
+ // started merging:
+
+ BitVector deletes = null;
+ int docUpto = 0;
+
+ final int numSegmentsToMerge = sourceSegments.size();
+ for(int i=0;i minSegment; i--) // remove old infos & add new
- segmentInfos.remove(i);
+ merger = new SegmentMerger(this, mergedName);
- segmentInfos.set(minSegment, newSegment);
+ // This is try/finally to make sure merger's readers are
+ // closed:
- checkpoint();
+ boolean success = false;
- success = true;
+ try {
+ int totDocCount = 0;
+ for (int i = 0; i < numSegments; i++) {
+ SegmentInfo si = sourceSegmentsClone.info(i);
+ if (infoStream != null)
+ infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
+ IndexReader reader = SegmentReader.get(si, MERGE_READ_BUFFER_SIZE, merge.mergeDocStores); // no need to set deleter (yet)
+ merger.add(reader);
+ if (infoStream != null)
+ totDocCount += reader.numDocs();
+ }
+ if (infoStream != null) {
+ infoStream.println(" into "+mergedName+" ("+totDocCount+" docs)");
+ }
- } finally {
- if (!success) {
- if (rollback != null) {
- // Rollback the individual SegmentInfo
- // instances, but keep original SegmentInfos
- // instance (so we don't try to write again the
- // same segments_N file -- write once):
- segmentInfos.clear();
- segmentInfos.addAll(rollback);
- }
+ mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);
- // Delete any partially created and now unreferenced files:
- deleter.refresh();
- }
- }
+ if (infoStream != null)
+ assert mergedDocCount == totDocCount;
+
+ success = true;
+
} finally {
- // close readers before we attempt to delete now-obsolete segments
+ // close readers before we attempt to delete
+ // now-obsolete segments
if (merger != null) {
merger.closeReaders();
}
+ if (!success) {
+ synchronized(this) {
+ deleter.refresh(mergedName);
+ }
+ }
}
- // Give deleter a chance to remove files now.
- deleter.checkpoint(segmentInfos, autoCommit);
+ if (!commitMerge(merge))
+ // commitMerge will return false if this merge was aborted
+ return 0;
- if (useCompoundFile) {
+ if (merge.useCompoundFile) {
+
+ success = false;
+ boolean skip = false;
+ final String compoundFileName = mergedName + "." + IndexFileNames.COMPOUND_FILE_EXTENSION;
- boolean success = false;
-
try {
+ try {
+ merger.createCompoundFile(compoundFileName);
+ success = true;
+ } catch (IOException ioe) {
+ synchronized(this) {
+ if (segmentInfos.indexOf(merge.info) == -1) {
+ // If another merge kicked in and merged our
+ // new segment away while we were trying to
+ // build the compound file, we can hit a
+ // FileNotFoundException and possibly
+ // IOException over NFS. We can tell this has
+ // happened because our SegmentInfo is no
+ // longer in the segments; if this has
+ // happened it is safe to ignore the exception
+ // & skip finishing/committing our compound
+ // file creating.
+ skip = true;
+ } else
+ throw ioe;
+ }
+ }
+ } finally {
+ if (!success) {
+ synchronized(this) {
+ deleter.deleteFile(compoundFileName);
+ }
+ }
+ }
- merger.createCompoundFile(mergedName + ".cfs");
- newSegment.setUseCompoundFile(true);
- checkpoint();
- success = true;
+ if (!skip) {
- } finally {
- if (!success) {
- // Must rollback:
- newSegment.setUseCompoundFile(false);
- deleter.refresh();
+ synchronized(this) {
+ if (skip || segmentInfos.indexOf(merge.info) == -1 || merge.isAborted()) {
+ // Our segment (committed in non-compound
+ // format) got merged away while we were
+ // building the compound format.
+ deleter.deleteFile(compoundFileName);
+ } else {
+ success = false;
+ try {
+ merge.info.setUseCompoundFile(true);
+ checkpoint();
+ success = true;
+ } finally {
+ if (!success) {
+ // Must rollback:
+ merge.info.setUseCompoundFile(false);
+ deletePartialSegmentsFile();
+ deleter.deleteFile(compoundFileName);
+ }
+ }
+
+ // Give deleter a chance to remove files now.
+ deleter.checkpoint(segmentInfos, autoCommit);
+ }
}
}
-
- // Give deleter a chance to remove files now.
- deleter.checkpoint(segmentInfos, autoCommit);
}
return mergedDocCount;
}
+ private void deletePartialSegmentsFile() throws IOException {
+ if (segmentInfos.getLastGeneration() != segmentInfos.getGeneration()) {
+ String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
+ "",
+ segmentInfos.getGeneration());
+ deleter.deleteFile(segmentFileName);
+ }
+ }
+
// Called during flush to apply any buffered deletes. If
// flushedNewSegment is true then a new segment was just
// created and flushed from the ram segments, so we will
@@ -2385,29 +2790,6 @@
}
}
- private final boolean checkNonDecreasingLevels(int start) {
- int lowerBound = -1;
- int upperBound = docWriter.getMaxBufferedDocs();
-
- /* new merge policy
- if (upperBound == 0)
- upperBound = 10;
- */
-
- for (int i = segmentInfos.size() - 1; i >= start; i--) {
- int docCount = segmentInfos.info(i).docCount;
- if (docCount <= lowerBound) {
- return false;
- }
-
- while (docCount > upperBound) {
- lowerBound = upperBound;
- upperBound *= mergeFactor;
- }
- }
- return true;
- }
-
// For test purposes.
final synchronized int getBufferedDeleteTermsSize() {
return bufferedDeleteTerms.size();
@@ -2484,7 +2866,24 @@
Iterator iter = deleteTerms.entrySet().iterator();
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
- reader.deleteDocuments((Term) entry.getKey());
+ final int count = reader.deleteDocuments((Term) entry.getKey());
}
}
+
+ // utility routines for tests
+ SegmentInfo newestSegment() {
+ return segmentInfos.info(segmentInfos.size()-1);
+ }
+
+ public synchronized String segString() {
+ StringBuffer buffer = new StringBuffer();
+ for(int i = 0; i < segmentInfos.size(); i++) {
+ if (i > 0) {
+ buffer.append(' ');
+ }
+ buffer.append(segmentInfos.info(i).segString(directory));
+ }
+
+ return buffer.toString();
+ }
}
Index: src/java/org/apache/lucene/index/IndexFileDeleter.java
===================================================================
--- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 573633)
+++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy)
@@ -100,12 +100,18 @@
private IndexDeletionPolicy policy;
private DocumentsWriter docWriter;
+ private int fullRefreshCount; // tracks how many times refresh(null) has been called
+
+ int getFullRefreshCount() {
+ return fullRefreshCount;
+ }
+
void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
}
private void message(String message) {
- infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message);
+ infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message);
}
/**
@@ -275,25 +281,61 @@
* Writer calls this when it has hit an error and had to
* roll back, to tell us that there may now be
* unreferenced files in the filesystem. So we re-list
- * the filesystem and delete such files:
+ * the filesystem and delete such files. If segmentName
+ * is non-null, we will only delete files corresponding to
+ * that segment.
*/
- public void refresh() throws IOException {
+ public void refresh(String segmentName) throws IOException {
+ if (segmentName == null)
+ fullRefreshCount++;
String[] files = directory.list();
if (files == null)
throw new IOException("cannot read directory " + directory + ": list() returned null");
IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
+ String segmentPrefix1;
+ String segmentPrefix2;
+ if (segmentName != null) {
+ segmentPrefix1 = segmentName + ".";
+ segmentPrefix2 = segmentName + "_";
+ } else {
+ segmentPrefix1 = null;
+ segmentPrefix2 = null;
+ }
+
for(int i=0;i 0;
return --count;
}
}
Index: src/java/org/apache/lucene/index/MergeScheduler.java
===================================================================
--- src/java/org/apache/lucene/index/MergeScheduler.java (revision 0)
+++ src/java/org/apache/lucene/index/MergeScheduler.java (revision 0)
@@ -0,0 +1,36 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/** Expert: {@link IndexWriter} uses an instance
+ * implementing this interface to execute the merges
+ * selected by a {@link MergePolicy}. The default
+ * MergeScheduler is {@link SerialMergeScheduler}. */
+
+public interface MergeScheduler {
+
+ /** Run the requested merges from spec. */
+ void merge(IndexWriter writer, MergePolicy.MergeSpecification spec)
+ throws CorruptIndexException, IOException;
+
+ /** Close this MergeScheduler. */
+ void close()
+ throws CorruptIndexException, IOException;
+}
Property changes on: src/java/org/apache/lucene/index/MergeScheduler.java
___________________________________________________________________
Name: svn:eol-style
+ native
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 573633)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy)
@@ -21,6 +21,7 @@
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.store.Directory;
import java.io.IOException;
@@ -53,6 +54,7 @@
boolean autoCommit = config.get("autocommit", OpenIndexTask.DEFAULT_AUTO_COMMIT);
IndexWriter iw = new IndexWriter(dir, autoCommit, analyzer, true);
+ iw.setMergeScheduler(new ConcurrentMergeScheduler());
iw.setUseCompoundFile(cmpnd);
iw.setMergeFactor(mrgf);
Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java
===================================================================
--- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java (revision 573633)
+++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java (working copy)
@@ -24,6 +24,7 @@
import org.apache.lucene.document.Field;
import java.io.BufferedReader;
+import java.io.FileReader;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStreamReader;
@@ -81,9 +82,9 @@
public Document setFields(String line) {
// title date body
- int spot = line.indexOf(SEP);
+ final int spot = line.indexOf(SEP);
titleField.setValue(line.substring(0, spot));
- int spot2 = line.indexOf(SEP, 1+spot);
+ final int spot2 = line.indexOf(SEP, 1+spot);
dateField.setValue(line.substring(1+spot, spot2));
bodyField.setValue(line.substring(1+spot2, line.length()));
return doc;
@@ -111,12 +112,14 @@
return ds;
}
+ private int lineCount = 0;
public Document makeDocument() throws Exception {
String line;
synchronized(this) {
while(true) {
line = fileIn.readLine();
+ lineCount++;
if (line == null) {
if (!forever)
throw new NoMoreDataException();
@@ -149,8 +152,10 @@
try {
if (fileIn != null)
fileIn.close();
- fileIS = new FileInputStream(fileName);
- fileIn = new BufferedReader(new InputStreamReader(fileIS,"UTF-8"), READER_BUFFER_BYTES);
+ // nocommit
+ //fileIS = new FileInputStream(fileName);
+ //fileIn = new BufferedReader(new InputStreamReader(fileIS,"UTF-8"), READER_BUFFER_BYTES);
+ fileIn = new BufferedReader(new FileReader(fileName), READER_BUFFER_BYTES);
} catch (IOException e) {
throw new RuntimeException(e);
}