Index: src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java =================================================================== --- src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (revision 606041) +++ src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (working copy) @@ -219,12 +219,6 @@ writer.close(); } - try { - directory.close(); - } catch (RuntimeException re) { - // MockRAMDirectory will throw RuntimeExceptions when there - // are still open files, which is OK since some merge - // threads may still be running at this point. - } + directory.close(); } } Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 606041) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -21,6 +21,7 @@ import java.io.Reader; import java.io.File; import java.util.Arrays; +import java.util.ArrayList; import java.util.Random; import org.apache.lucene.util.LuceneTestCase; @@ -1981,4 +1982,100 @@ } } } + + public void testNoWaitClose() throws Throwable { + RAMDirectory directory = new MockRAMDirectory(); + + final Document doc = new Document(); + Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED); + doc.add(idField); + + for(int pass=0;pass<3;pass++) { + boolean autoCommit = pass%2 == 0; + IndexWriter writer = new IndexWriter(directory, autoCommit, new WhitespaceAnalyzer(), true); + + //System.out.println("TEST: pass=" + pass + " ac=" + autoCommit + " cms=" + (pass >= 2)); + for(int iter=0;iter<10;iter++) { + //System.out.println("TEST: iter=" + iter); + MergeScheduler ms; + if (pass >= 2) + ms = new ConcurrentMergeScheduler(); + else + ms = new SerialMergeScheduler(); + + writer.setMergeScheduler(ms); + writer.setMaxBufferedDocs(2); + writer.setMergeFactor(100); + + for(int j=0;j<199;j++) { + idField.setValue(Integer.toString(iter*201+j)); + writer.addDocument(doc); + } + + int delID = iter*199; + for(int j=0;j<20;j++) { + writer.deleteDocuments(new Term("id", Integer.toString(delID))); + delID += 5; + } + + // Force a bunch of merge threads to kick off so we + // stress out aborting them on close: + writer.setMergeFactor(2); + + final IndexWriter finalWriter = writer; + final ArrayList failure = new ArrayList(); + Thread t1 = new Thread() { + public void run() { + boolean done = false; + while(!done) { + for(int i=0;i<100;i++) { + try { + finalWriter.addDocument(doc); + } catch (AlreadyClosedException e) { + done = true; + break; + } catch (NullPointerException e) { + done = true; + break; + } catch (Throwable e) { + e.printStackTrace(System.out); + failure.add(e); + done = true; + break; + } + } + Thread.yield(); + } + + } + }; + + if (failure.size() > 0) + throw (Throwable) failure.get(0); + + t1.start(); + + writer.close(false); + while(true) { + try { + t1.join(); + break; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + // Make sure reader can read + IndexReader reader = IndexReader.open(directory); + reader.close(); + + // Reopen + writer = new IndexWriter(directory, autoCommit, new WhitespaceAnalyzer(), false); + } + writer.close(); + } + + directory.close(); + } + } Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 606041) +++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -251,18 +251,15 @@ message(" merge thread: done"); - } catch (Throwable exc) { - // When a merge was aborted & IndexWriter closed, - // it's possible to get various IOExceptions, - // NullPointerExceptions, AlreadyClosedExceptions: + } catch (IOException exc) { + if (merge != null) { merge.setException(exc); writer.addMergeException(merge); } - if (merge == null || !merge.isAborted()) { - // If the merge was not aborted then the exception - // is real + // Ignore the exception if it was due to abort: + if (!(exc instanceof MergePolicy.MergeAbortedException)) { synchronized(ConcurrentMergeScheduler.this) { exceptions.add(exc); } Index: src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- src/java/org/apache/lucene/index/SegmentMerger.java (revision 606041) +++ src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -53,6 +53,8 @@ private int mergedDocs; + private CheckAbort checkAbort; + // Whether we should merge doc stores (stored fields and // vectors files). When all segments we are merging // already share the same doc store files, we don't need @@ -61,7 +63,7 @@ /** Maximum number of contiguous documents to bulk-copy when merging stored fields */ - private final static int MAX_RAW_MERGE_DOCS = 16384; + private final static int MAX_RAW_MERGE_DOCS = 4192; /** This ctor used only by test code. * @@ -73,9 +75,11 @@ segment = name; } - SegmentMerger(IndexWriter writer, String name) { + SegmentMerger(IndexWriter writer, String name, MergePolicy.OneMerge merge) { directory = writer.getDirectory(); segment = name; + if (merge != null) + checkAbort = new CheckAbort(merge, directory); termIndexInterval = writer.getTermIndexInterval(); } @@ -144,7 +148,7 @@ final Vector createCompoundFile(String fileName) throws IOException { CompoundFileWriter cfsWriter = - new CompoundFileWriter(directory, fileName); + new CompoundFileWriter(directory, fileName, checkAbort); Vector files = new Vector(IndexFileNames.COMPOUND_EXTENSIONS.length + 1); @@ -265,9 +269,6 @@ // Used for bulk-reading raw bytes for stored fields final int[] rawDocLengths = new int[MAX_RAW_MERGE_DOCS]; - // merge field values - final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos); - // for merging we don't want to compress/uncompress the data, so to tell the FieldsReader that we're // in merge mode, we use this FieldSelector FieldSelector fieldSelectorMerge = new FieldSelector() { @@ -276,6 +277,9 @@ } }; + // merge field values + final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos); + try { for (int i = 0; i < readers.size(); i++) { final IndexReader reader = (IndexReader) readers.elementAt(i); @@ -302,10 +306,14 @@ IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs); fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs); docCount += numDocs; + if (checkAbort != null) + checkAbort.work(300*numDocs); } else { fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge)); j++; docCount++; + if (checkAbort != null) + checkAbort.work(300); } } else j++; @@ -342,6 +350,8 @@ if (reader.isDeleted(docNum)) continue; termVectorsWriter.addAllDocVectors(reader.getTermFreqVectors(docNum)); + if (checkAbort != null) + checkAbort.work(300); } } } finally { @@ -405,8 +415,11 @@ top = (SegmentMergeInfo) queue.top(); } - mergeTermInfo(match, matchSize); // add new TermInfo + final int df = mergeTermInfo(match, matchSize); // add new TermInfo + if (checkAbort != null) + checkAbort.work(df/3.0); + while (matchSize > 0) { SegmentMergeInfo smi = match[--matchSize]; if (smi.next()) @@ -428,7 +441,7 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - private final void mergeTermInfo(SegmentMergeInfo[] smis, int n) + private final int mergeTermInfo(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { long freqPointer = freqOutput.getFilePointer(); long proxPointer = proxOutput.getFilePointer(); @@ -442,6 +455,8 @@ termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer)); termInfosWriter.add(smis[0].term, termInfo); } + + return df; } private byte[] payloadBuffer = null; @@ -562,6 +577,8 @@ } } } + if (checkAbort != null) + checkAbort.work(maxDoc); } } } @@ -572,4 +589,20 @@ } } + final static class CheckAbort { + private double workCount; + private MergePolicy.OneMerge merge; + private Directory dir; + public CheckAbort(MergePolicy.OneMerge merge, Directory dir) { + this.merge = merge; + this.dir = dir; + } + public void work(double count) throws MergePolicy.MergeAbortedException { + workCount += count; + if (workCount >= 10000.0) { + merge.checkAborted(dir); + workCount = 0; + } + } + } } Index: src/java/org/apache/lucene/index/MergePolicy.java =================================================================== --- src/java/org/apache/lucene/index/MergePolicy.java (revision 606041) +++ src/java/org/apache/lucene/index/MergePolicy.java (working copy) @@ -86,29 +86,34 @@ /** Record that an exception occurred while executing * this merge */ - public synchronized void setException(Throwable error) { + synchronized void setException(Throwable error) { this.error = error; } /** Retrieve previous exception set by {@link * #setException}. */ - public synchronized Throwable getException() { + synchronized Throwable getException() { return error; } /** Mark this merge as aborted. If this is called * before the merge is committed then the merge will * not be committed. */ - public synchronized void abort() { + synchronized void abort() { aborted = true; } /** Returns true if this merge was aborted. */ - public synchronized boolean isAborted() { + synchronized boolean isAborted() { return aborted; } - public String segString(Directory dir) { + synchronized void checkAborted(Directory dir) throws MergeAbortedException { + if (aborted) + throw new MergeAbortedException("merge is aborted: " + segString(dir)); + } + + String segString(Directory dir) { StringBuffer b = new StringBuffer(); final int numSegments = segments.size(); for(int i=0;idir or name is null */ public CompoundFileWriter(Directory dir, String name) { + this(dir, name, null); + } + + CompoundFileWriter(Directory dir, String name, SegmentMerger.CheckAbort checkAbort) { if (dir == null) throw new NullPointerException("directory cannot be null"); if (name == null) throw new NullPointerException("name cannot be null"); - + this.checkAbort = checkAbort; directory = dir; fileName = name; ids = new HashSet(); @@ -211,6 +215,10 @@ is.readBytes(buffer, 0, len); os.writeBytes(buffer, len); remainder -= len; + if (checkAbort != null) + // Roughly every 2 MB we will check if + // it's time to abort + checkAbort.work(80); } // Verify that remainder is 0 Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 606041) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -27,6 +27,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RAMOutputStream; +import org.apache.lucene.store.AlreadyClosedException; import java.io.IOException; import java.io.PrintStream; @@ -148,6 +149,8 @@ // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; + private boolean closed; + // Coarse estimates used to measure RAM usage of buffered deletes private static int OBJECT_HEADER_BYTES = 12; private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform @@ -2168,6 +2171,10 @@ } } + synchronized void close() { + closed = true; + } + /** Returns a free (idle) ThreadState that may be used for * indexing this one document. This call also pauses if a * flush is pending. If delTerm is non-null then we @@ -2211,6 +2218,9 @@ Thread.currentThread().interrupt(); } + if (closed) + throw new AlreadyClosedException("this IndexWriter is closed"); + if (segment == null) segment = writer.newSegmentName(); Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 606041) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -291,6 +291,7 @@ private Set runningMerges = new HashSet(); private List mergeExceptions = new ArrayList(); private long mergeGen; + private boolean stopMerges; /** * Used internally to throw an {@link @@ -1150,8 +1151,10 @@ * using a MergeScheduler that runs merges in background * threads. * @param waitForMerges if true, this call will block - * until all merges complete; else, it will abort all - * running merges and return right away + * until all merges complete; else, it will ask all + * running merges to abort, wait until those merges have + * finished (which should be at most a few seconds), and + * then return. */ public void close(boolean waitForMerges) throws CorruptIndexException, IOException { boolean doClose; @@ -1186,6 +1189,8 @@ if (infoStream != null) message("now flush at close"); + docWriter.close(); + // Only allow a new merge to be triggered if we are // going to wait for merges: flush(waitForMerges, true); @@ -1196,33 +1201,33 @@ mergeScheduler.close(); - if (commitPending) { - boolean success = false; - try { - segmentInfos.write(directory); // now commit changes - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception committing segments file during close"); - deletePartialSegmentsFile(); + synchronized(this) { + if (commitPending) { + boolean success = false; + try { + segmentInfos.write(directory); // now commit changes + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception committing segments file during close"); + deletePartialSegmentsFile(); + } } - } - if (infoStream != null) - message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); - synchronized(this) { + if (infoStream != null) + message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); + deleter.checkpoint(segmentInfos, true); + + commitPending = false; + rollbackSegmentInfos = null; } - commitPending = false; - rollbackSegmentInfos = null; - } - if (infoStream != null) - message("at close: " + segString()); + if (infoStream != null) + message("at close: " + segString()); - docWriter = null; + docWriter = null; - synchronized(this) { deleter.close(); } @@ -1440,9 +1445,11 @@ synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here - final List files = docWriter.abortedFiles(); - if (files != null) - deleter.deleteNewFiles(files); + if (docWriter != null) { + final List files = docWriter.abortedFiles(); + if (files != null) + deleter.deleteNewFiles(files); + } } } } @@ -1799,6 +1806,9 @@ throws CorruptIndexException, IOException { assert !optimize || maxNumSegmentsOptimize > 0; + if (stopMerges) + return; + final MergePolicy.MergeSpecification spec; if (optimize) { spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxNumSegmentsOptimize, segmentsToOptimize); @@ -1861,6 +1871,7 @@ localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone(); localAutoCommit = autoCommit; + if (localAutoCommit) { if (infoStream != null) @@ -1905,6 +1916,7 @@ deleter.refresh(); finishMerges(false); + stopMerges = false; } /* @@ -1995,7 +2007,6 @@ // them: deleter.checkpoint(segmentInfos, false); deleter.refresh(); - finishMerges(false); } commitPending = false; @@ -2004,8 +2015,11 @@ waitForClose(); } - private synchronized void finishMerges(boolean waitForMerges) { + private synchronized void finishMerges(boolean waitForMerges) throws IOException { if (!waitForMerges) { + + stopMerges = true; + // Abort all pending & running merges: Iterator it = pendingMerges.iterator(); while(it.hasNext()) { @@ -2013,9 +2027,10 @@ if (infoStream != null) message("now abort pending merge " + merge.segString(directory)); merge.abort(); + mergeFinish(merge); } pendingMerges.clear(); - + it = runningMerges.iterator(); while(it.hasNext()) { final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next(); @@ -2023,10 +2038,27 @@ message("now abort running merge " + merge.segString(directory)); merge.abort(); } - runningMerges.clear(); - mergingSegments.clear(); - notifyAll(); + // These merges periodically check whether they have + // been aborted, and stop if so. We wait here to make + // sure they all stop. It should not take very long + // because the merge threads periodically check if + // they are aborted. + while(runningMerges.size() > 0) { + if (infoStream != null) + message("now wait for " + runningMerges.size() + " running merge to abort"); + try { + wait(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + assert 0 == mergingSegments.size(); + + if (infoStream != null) + message("all running merges have aborted"); + } else { while(pendingMerges.size() > 0 || runningMerges.size() > 0) { try { @@ -2263,7 +2295,7 @@ optimize(); // start with zero or 1 seg final String mergedName = newSegmentName(); - SegmentMerger merger = new SegmentMerger(this, mergedName); + SegmentMerger merger = new SegmentMerger(this, mergedName, null); SegmentInfo info; @@ -2684,10 +2716,12 @@ deletes.set(docUpto); docUpto++; } - + } else // No deletes before or after docUpto += currentInfo.docCount; + + merge.checkAborted(directory); } if (deletes != null) { @@ -2783,15 +2817,26 @@ try { - if (merge.info == null) - mergeInit(merge); + try { + if (merge.info == null) + mergeInit(merge); - if (infoStream != null) - message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); + if (infoStream != null) + message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); - mergeMiddle(merge); - - success = true; + mergeMiddle(merge); + success = true; + } catch (MergePolicy.MergeAbortedException e) { + merge.setException(e); + addMergeException(merge); + // We can ignore this exception, unless the merge + // involves segments from external directories, in + // which case we must throw it so, for example, the + // rollbackTransaction code in addIndexes* is + // executed. + if (merge.isExternal) + throw e; + } } finally { synchronized(this) { try { @@ -2863,11 +2908,11 @@ * the synchronized lock on IndexWriter instance. */ final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException { + assert merge.registerDone; + if (merge.isAborted()) - throw new IOException("merge is aborted"); + return; - assert merge.registerDone; - final SegmentInfos sourceSegments = merge.segments; final int end = sourceSegments.size(); @@ -3010,6 +3055,8 @@ * instance */ final private int mergeMiddle(MergePolicy.OneMerge merge) throws CorruptIndexException, IOException { + + merge.checkAborted(directory); final String mergedName = merge.info.name; @@ -3024,8 +3071,8 @@ if (infoStream != null) message("merging " + merge.segString(directory)); - merger = new SegmentMerger(this, mergedName); - + merger = new SegmentMerger(this, mergedName, merge); + // This is try/finally to make sure merger's readers are // closed: @@ -3044,8 +3091,7 @@ message("merge: total "+totDocCount+" docs"); } - if (merge.isAborted()) - throw new IOException("merge is aborted"); + merge.checkAborted(directory); mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);