Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 611411) +++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy) @@ -18,7 +18,6 @@ */ import java.io.IOException; -import java.util.Iterator; /** * Used by MockRAMDirectory to create an output stream that @@ -50,6 +49,11 @@ } } + public void flush() throws IOException { + dir.maybeThrowDeterministicException(); + super.flush(); + } + public void writeByte(byte b) throws IOException { singleByte[0] = b; writeBytes(singleByte, 0, 1); @@ -80,6 +84,8 @@ super.writeBytes(b, offset, len); } + dir.maybeThrowDeterministicException(); + if (first) { // Maybe throw random exception; only do this on first // write to a new file: Index: src/test/org/apache/lucene/store/MockRAMDirectory.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 611411) +++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy) @@ -117,7 +117,6 @@ } void maybeThrowIOException() throws IOException { - maybeThrowDeterministicException(); if (randomIOExceptionRate > 0.0) { int number = Math.abs(randomState.nextInt() % 1000); if (number < randomIOExceptionRate*1000) { @@ -198,7 +197,7 @@ * RAMOutputStream.BUFFER_SIZE (now 1024) bytes. */ - final synchronized long getRecomputedActualSizeInBytes() { + public final synchronized long getRecomputedActualSizeInBytes() { long size = 0; Iterator it = fileMap.values().iterator(); while (it.hasNext()) @@ -245,6 +244,16 @@ * mock.failOn(failure.reset()) */ public Failure reset() { return this; } + + protected boolean doFail; + + public void setDoFail() { + doFail = true; + } + + public void clearDoFail() { + doFail = false; + } } ArrayList failures; @@ -253,7 +262,7 @@ * add a Failure object to the list of objects to be evaluated * at every potential failure point */ - public void failOn(Failure fail) { + synchronized public void failOn(Failure fail) { if (failures == null) { failures = new ArrayList(); } @@ -261,10 +270,10 @@ } /** - * Itterate through the failures list, giving each object a + * Iterate through the failures list, giving each object a * chance to throw an IOE */ - void maybeThrowDeterministicException() throws IOException { + synchronized void maybeThrowDeterministicException() throws IOException { if (failures != null) { for(int i = 0; i < failures.size(); i++) { ((Failure)failures.get(i)).eval(this); Index: src/test/org/apache/lucene/index/TestIndexWriterDelete.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriterDelete.java (revision 611411) +++ src/test/org/apache/lucene/index/TestIndexWriterDelete.java (working copy) @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.Arrays; -import java.lang.StackTraceElement; import org.apache.lucene.util.LuceneTestCase; @@ -454,7 +453,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); + new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); Arrays.sort(startFiles); @@ -560,8 +559,19 @@ } public void eval(MockRAMDirectory dir) throws IOException { if (sawMaybe && !failed) { - failed = true; - throw new IOException("fail after applyDeletes"); + boolean seen = false; + StackTraceElement[] trace = new Exception().getStackTrace(); + for (int i = 0; i < trace.length; i++) { + if ("applyDeletes".equals(trace[i].getMethodName())) { + seen = true; + break; + } + } + if (!seen) { + // Only fail once we are no longer in applyDeletes + failed = true; + throw new IOException("fail after applyDeletes"); + } } if (!failed) { StackTraceElement[] trace = new Exception().getStackTrace(); @@ -740,7 +750,7 @@ String[] startFiles = dir.list(); SegmentInfos infos = new SegmentInfos(); infos.read(dir); - IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); + new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null); String[] endFiles = dir.list(); if (!Arrays.equals(startFiles, endFiles)) { Index: src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestIndexWriter.java (revision 611411) +++ src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -1887,6 +1887,11 @@ throw new IOException("I'm experiencing problems"); return input.next(result); } + + public void reset() throws IOException { + super.reset(); + count = 0; + } } public void testDocumentsWriterExceptions() throws IOException { @@ -1969,6 +1974,122 @@ } } + public void testDocumentsWriterExceptionThreads() throws IOException { + Analyzer analyzer = new Analyzer() { + public TokenStream tokenStream(String fieldName, Reader reader) { + return new CrashingFilter(fieldName, new WhitespaceTokenizer(reader)); + } + }; + + final int NUM_THREAD = 3; + final int NUM_ITER = 100; + + for(int i=0;i<2;i++) { + MockRAMDirectory dir = new MockRAMDirectory(); + + { + final IndexWriter writer = new IndexWriter(dir, analyzer); + + final int finalI = i; + + Thread[] threads = new Thread[NUM_THREAD]; + for(int t=0;t= 5) + break; + } else { + if (noErrors) { + System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:"); + ioe.printStackTrace(System.out); + error = ioe; + } + break; + } + } catch (Throwable t) { + if (noErrors) { + System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:"); + t.printStackTrace(System.out); + error = t; + } + break; + } + } + } + } + + // LUCENE-1130: make sure we can close() even while + // threads are trying to add documents. Strictly + // speaking, this isn't valid us of Lucene's APIs, but we + // still want to be robust to this case: + public void testCloseWithThreads() throws IOException { + int NUM_THREADS = 3; + + for(int iter=0;iter<50;iter++) { + MockRAMDirectory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + + writer.setMergeScheduler(cms); + writer.setMaxBufferedDocs(10); + writer.setMergeFactor(4); + + IndexerThread[] threads = new IndexerThread[NUM_THREADS]; + boolean diskFull = false; + + for(int i=0;i 0); + reader.close(); + + dir.close(); + } + } + + // LUCENE-1130: make sure immeidate disk full on creating + // an IndexWriter (hit during DW.ThreadState.init()) is + // OK: + public void testImmediateDiskFull() throws IOException { + MockRAMDirectory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer()); + dir.setMaxSizeInBytes(dir.getRecomputedActualSizeInBytes()); + writer.setMaxBufferedDocs(2); + final Document doc = new Document(); + doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS)); + try { + writer.addDocument(doc); + fail("did not hit disk full"); + } catch (IOException ioe) { + } + // Without fix for LUCENE-1130: this call will hang: + try { + writer.addDocument(doc); + fail("did not hit disk full"); + } catch (IOException ioe) { + } + try { + writer.close(false); + fail("did not hit disk full"); + } catch (IOException ioe) { + } + } + + // LUCENE-1130: make sure immeidate disk full on creating + // an IndexWriter (hit during DW.ThreadState.init()), with + // multiple threads, is OK: + public void testImmediateDiskFullWithThreads() throws IOException { + + int NUM_THREADS = 3; + + for(int iter=0;iter<10;iter++) { + MockRAMDirectory dir = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + // We expect disk full exceptions in the merge threads + cms.setSuppressExceptions(); + writer.setMergeScheduler(cms); + writer.setMaxBufferedDocs(2); + writer.setMergeFactor(4); + dir.setMaxSizeInBytes(4*1024+20*iter); + + IndexerThread[] threads = new IndexerThread[NUM_THREADS]; + boolean diskFull = false; + + for(int i=0;i 0; @@ -474,6 +523,13 @@ try { + if (closeDocStore) { + assert docStoreSegment != null; + assert docStoreSegment.equals(segment); + newFiles.addAll(files()); + closeDocStore(); + } + fieldInfos.write(directory, segment + ".fnm"); docCount = numDocsInRAM; @@ -484,7 +540,7 @@ } finally { if (!success) - abort(); + abort(null); } return docCount; @@ -553,7 +609,6 @@ // doc has one boolean doFlushAfter; - boolean abortOnExc; public ThreadState() { fieldDataArray = new FieldData[8]; @@ -574,6 +629,7 @@ localFieldsWriter.close(); localFieldsWriter = null; } + fieldGen = 0; maxPostingsVectors = 0; doFlushAfter = false; postingsPool.reset(); @@ -589,52 +645,56 @@ /** Move all per-document state that was accumulated in * the ThreadState into the "real" stores. */ - public void writeDocument() throws IOException { + public void writeDocument() throws IOException, AbortException { // If we hit an exception while appending to the // stored fields or term vectors files, we have to // abort all documents since we last flushed because // it means those files are possibly inconsistent. - abortOnExc = true; + try { - // Append stored fields to the real FieldsWriter: - fieldsWriter.flushDocument(numStoredFields, fdtLocal); - fdtLocal.reset(); - numStoredFields = 0; + // Append stored fields to the real FieldsWriter: + fieldsWriter.flushDocument(numStoredFields, fdtLocal); + fdtLocal.reset(); - // Append term vectors to the real outputs: - if (tvx != null) { - tvx.writeLong(tvd.getFilePointer()); - tvd.writeVInt(numVectorFields); - if (numVectorFields > 0) { - for(int i=0;i 0) { + for(int i=0;i 0 && ((float) fp.numPostings) / fp.postingsHashSize < 0.2) { @@ -996,7 +1079,7 @@ /** Tokenizes the fields of a document into Postings */ void processDocument(Analyzer analyzer) - throws IOException { + throws IOException, AbortException { final int numFields = numFieldData; @@ -1215,7 +1298,7 @@ int fieldCount; Fieldable[] docFields = new Fieldable[1]; - int lastDocID = -1; + int lastGen = -1; FieldData next; boolean doNorms; @@ -1284,7 +1367,7 @@ } /** Process all occurrences of one field in the document. */ - public void processField(Analyzer analyzer) throws IOException { + public void processField(Analyzer analyzer) throws IOException, AbortException { length = 0; position = 0; offset = 0; @@ -1316,10 +1399,8 @@ // contents of fdtLocal can be corrupt, so // we must discard all stored fields for // this document: - if (!success) { - numStoredFields = 0; + if (!success) fdtLocal.reset(); - } } } @@ -1354,7 +1435,7 @@ Token localToken = new Token(); /* Invert one occurrence of one field in the document */ - public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException { + public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException, AbortException { if (length>0) position += analyzer.getPositionIncrementGap(fieldInfo.name); @@ -1475,7 +1556,7 @@ * for every term of every document. Its job is to * * update the postings byte stream (Postings hash) * * based on the occurence of a single term. */ - private void addPosition(Token token) { + private void addPosition(Token token) throws AbortException { final Payload payload = token.getPayload(); @@ -1519,167 +1600,168 @@ // partially written and thus inconsistent if // flushed, so we have to abort all documents // since the last flush: - abortOnExc = true; - if (p != null) { // term seen since last flush + try { - if (docID != p.lastDocID) { // term not yet seen in this doc + if (p != null) { // term seen since last flush + + if (docID != p.lastDocID) { // term not yet seen in this doc - // System.out.println(" seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto); + // System.out.println(" seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto); - assert p.docFreq > 0; + assert p.docFreq > 0; - // Now that we know doc freq for previous doc, - // write it & lastDocCode - freqUpto = p.freqUpto & BYTE_BLOCK_MASK; - freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT]; - if (1 == p.docFreq) - writeFreqVInt(p.lastDocCode|1); - else { - writeFreqVInt(p.lastDocCode); - writeFreqVInt(p.docFreq); - } - p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK); + // Now that we know doc freq for previous doc, + // write it & lastDocCode + freqUpto = p.freqUpto & BYTE_BLOCK_MASK; + freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT]; + if (1 == p.docFreq) + writeFreqVInt(p.lastDocCode|1); + else { + writeFreqVInt(p.lastDocCode); + writeFreqVInt(p.docFreq); + } + p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK); - if (doVectors) { - vector = addNewVector(); - if (doVectorOffsets) { - offsetStartCode = offsetStart = offset + token.startOffset(); - offsetEnd = offset + token.endOffset(); + if (doVectors) { + vector = addNewVector(); + if (doVectorOffsets) { + offsetStartCode = offsetStart = offset + token.startOffset(); + offsetEnd = offset + token.endOffset(); + } } - } - proxCode = position; + proxCode = position; - p.docFreq = 1; + p.docFreq = 1; - // Store code so we can write this after we're - // done with this new doc - p.lastDocCode = (docID-p.lastDocID) << 1; - p.lastDocID = docID; + // Store code so we can write this after we're + // done with this new doc + p.lastDocCode = (docID-p.lastDocID) << 1; + p.lastDocID = docID; - } else { // term already seen in this doc - // System.out.println(" seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto); - p.docFreq++; + } else { // term already seen in this doc + // System.out.println(" seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto); + p.docFreq++; - proxCode = position-p.lastPosition; + proxCode = position-p.lastPosition; - if (doVectors) { - vector = p.vector; - if (vector == null) - vector = addNewVector(); - if (doVectorOffsets) { - offsetStart = offset + token.startOffset(); - offsetEnd = offset + token.endOffset(); - offsetStartCode = offsetStart-vector.lastOffset; + if (doVectors) { + vector = p.vector; + if (vector == null) + vector = addNewVector(); + if (doVectorOffsets) { + offsetStart = offset + token.startOffset(); + offsetEnd = offset + token.endOffset(); + offsetStartCode = offsetStart-vector.lastOffset; + } } } - } - } else { // term not seen before - // System.out.println(" never seen docID=" + docID); + } else { // term not seen before + // System.out.println(" never seen docID=" + docID); - // Refill? - if (0 == postingsFreeCount) { - postingsFreeCount = postingsFreeList.length; - getPostings(postingsFreeList); - } + // Refill? + if (0 == postingsFreeCount) { + postingsFreeCount = postingsFreeList.length; + getPostings(postingsFreeList); + } - final int textLen1 = 1+tokenTextLen; - if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) { - if (textLen1 > CHAR_BLOCK_SIZE) { - // Just skip this term, to remain as robust as - // possible during indexing. A TokenFilter - // can be inserted into the analyzer chain if - // other behavior is wanted (pruning the term - // to a prefix, throwing an exception, etc). - abortOnExc = false; - if (maxTermPrefix == null) - maxTermPrefix = new String(tokenText, 0, 30); + final int textLen1 = 1+tokenTextLen; + if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) { + if (textLen1 > CHAR_BLOCK_SIZE) { + // Just skip this term, to remain as robust as + // possible during indexing. A TokenFilter + // can be inserted into the analyzer chain if + // other behavior is wanted (pruning the term + // to a prefix, throwing an exception, etc). + if (maxTermPrefix == null) + maxTermPrefix = new String(tokenText, 0, 30); - // Still increment position: - position++; - return; + // Still increment position: + position++; + return; + } + charPool.nextBuffer(); } - charPool.nextBuffer(); - } - final char[] text = charPool.buffer; - final int textUpto = charPool.byteUpto; + final char[] text = charPool.buffer; + final int textUpto = charPool.byteUpto; - // Pull next free Posting from free list - p = postingsFreeList[--postingsFreeCount]; + // Pull next free Posting from free list + p = postingsFreeList[--postingsFreeCount]; - p.textStart = textUpto + charPool.byteOffset; - charPool.byteUpto += textLen1; + p.textStart = textUpto + charPool.byteOffset; + charPool.byteUpto += textLen1; - System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen); + System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen); - text[textUpto+tokenTextLen] = 0xffff; + text[textUpto+tokenTextLen] = 0xffff; - assert postingsHash[hashPos] == null; + assert postingsHash[hashPos] == null; - postingsHash[hashPos] = p; - numPostings++; + postingsHash[hashPos] = p; + numPostings++; - if (numPostings == postingsHashHalfSize) - rehashPostings(2*postingsHashSize); + if (numPostings == postingsHashHalfSize) + rehashPostings(2*postingsHashSize); - // Init first slice for freq & prox streams - final int firstSize = levelSizeArray[0]; + // Init first slice for freq & prox streams + final int firstSize = levelSizeArray[0]; - final int upto1 = postingsPool.newSlice(firstSize); - p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1; + final int upto1 = postingsPool.newSlice(firstSize); + p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1; - final int upto2 = postingsPool.newSlice(firstSize); - p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2; + final int upto2 = postingsPool.newSlice(firstSize); + p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2; - p.lastDocCode = docID << 1; - p.lastDocID = docID; - p.docFreq = 1; + p.lastDocCode = docID << 1; + p.lastDocID = docID; + p.docFreq = 1; - if (doVectors) { - vector = addNewVector(); - if (doVectorOffsets) { - offsetStart = offsetStartCode = offset + token.startOffset(); - offsetEnd = offset + token.endOffset(); + if (doVectors) { + vector = addNewVector(); + if (doVectorOffsets) { + offsetStart = offsetStartCode = offset + token.startOffset(); + offsetEnd = offset + token.endOffset(); + } } + + proxCode = position; } - proxCode = position; - } + proxUpto = p.proxUpto & BYTE_BLOCK_MASK; + prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT]; + assert prox != null; - proxUpto = p.proxUpto & BYTE_BLOCK_MASK; - prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT]; - assert prox != null; + if (payload != null && payload.length > 0) { + writeProxVInt((proxCode<<1)|1); + writeProxVInt(payload.length); + writeProxBytes(payload.data, payload.offset, payload.length); + fieldInfo.storePayloads = true; + } else + writeProxVInt(proxCode<<1); - if (payload != null && payload.length > 0) { - writeProxVInt((proxCode<<1)|1); - writeProxVInt(payload.length); - writeProxBytes(payload.data, payload.offset, payload.length); - fieldInfo.storePayloads = true; - } else - writeProxVInt(proxCode<<1); + p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK); - p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK); + p.lastPosition = position++; - p.lastPosition = position++; + if (doVectorPositions) { + posUpto = vector.posUpto & BYTE_BLOCK_MASK; + pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT]; + writePosVInt(proxCode); + vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK); + } - if (doVectorPositions) { - posUpto = vector.posUpto & BYTE_BLOCK_MASK; - pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT]; - writePosVInt(proxCode); - vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK); + if (doVectorOffsets) { + offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK; + offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT]; + writeOffsetVInt(offsetStartCode); + writeOffsetVInt(offsetEnd-offsetStart); + vector.lastOffset = offsetEnd; + vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK); + } + } catch (Throwable t) { + throw new AbortException(t, DocumentsWriter.this); } - - if (doVectorOffsets) { - offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK; - offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT]; - writeOffsetVInt(offsetStartCode); - writeOffsetVInt(offsetEnd-offsetStart); - vector.lastOffset = offsetEnd; - vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK); - } - - abortOnExc = false; } /** Called when postings hash is too small (> 50% @@ -2209,6 +2291,7 @@ synchronized void close() { closed = true; + notifyAll(); } /** Returns a free (idle) ThreadState that may be used for @@ -2247,7 +2330,7 @@ // Next, wait until my thread state is idle (in case // it's shared with other threads) and for threads to // not be paused nor a flush pending: - while(!state.isIdle || pauseThreads != 0 || flushPending) + while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting)) try { wait(); } catch (InterruptedException e) { @@ -2275,28 +2358,31 @@ state.isIdle = false; - boolean success = false; try { - state.init(doc, nextDocID++); - - if (delTerm != null) { - addDeleteTerm(delTerm, state.docID); - if (!state.doFlushAfter) - state.doFlushAfter = timeToFlushDeletes(); - } - - success = true; - } finally { - if (!success) { - synchronized(this) { + boolean success = false; + try { + state.init(doc, nextDocID); + if (delTerm != null) { + addDeleteTerm(delTerm, state.docID); + if (!state.doFlushAfter) + state.doFlushAfter = timeToFlushDeletes(); + } + // Only increment nextDocID on successful init + nextDocID++; + success = true; + } finally { + if (!success) { + // Forcefully idle this ThreadState: state.isIdle = true; + notifyAll(); if (state.doFlushAfter) { state.doFlushAfter = false; flushPending = false; } - notifyAll(); } } + } catch (AbortException ae) { + abort(ae); } return state; @@ -2319,32 +2405,30 @@ // This call is synchronized but fast final ThreadState state = getThreadState(doc, delTerm); - boolean success = false; try { + boolean success = false; try { - // This call is not synchronized and does all the work - state.processDocument(analyzer); + try { + // This call is not synchronized and does all the work + state.processDocument(analyzer); + } finally { + // This call is synchronized but fast + finishDocument(state); + } + success = true; } finally { - // This call is synchronized but fast - finishDocument(state); - } - success = true; - } finally { - if (!success) { - synchronized(this) { - state.isIdle = true; - if (state.abortOnExc) - // Abort all buffered docs since last flush - abort(); - else + if (!success) { + synchronized(this) { // Immediately mark this document as deleted // since likely it was partially added. This // keeps indexing as "all or none" (atomic) when // adding a document: addDeleteDocID(state.docID); - notifyAll(); + } } } + } catch (AbortException ae) { + abort(ae); } return state.doFlushAfter || timeToFlushDeletes(); @@ -2467,51 +2551,57 @@ /** Does the synchronized work to finish/flush the * inverted document. */ - private synchronized void finishDocument(ThreadState state) throws IOException { + private synchronized void finishDocument(ThreadState state) throws IOException, AbortException { + if (aborting) { + // Forcefully idle this threadstate -- its state will + // be reset by abort() + state.isIdle = true; + notifyAll(); + return; + } // Now write the indexed document to the real files. - if (nextWriteDocID == state.docID) { // It's my turn, so write everything now: - state.isIdle = true; nextWriteDocID++; state.writeDocument(); + state.isIdle = true; + notifyAll(); // If any states were waiting on me, sweep through and // flush those that are enabled by my write. if (numWaiting > 0) { - while(true) { - int upto = 0; - for(int i=0;i i+1) + // Swap in the last waiting state to fill in + // the hole we just created. It's important + // to do this as-we-go and not at the end of + // the loop, because if we hit an aborting + // exception in one of the s.writeDocument + // calls (above), it leaves this array in an + // inconsistent state: + waitingThreadStates[i] = waitingThreadStates[numWaiting-1]; + numWaiting--; + } else { + assert !s.isIdle; + i++; + } } - if (upto == numWaiting) - break; - numWaiting = upto; } } - - // Now notify any incoming calls to addDocument - // (above) that are waiting on our line to - // shrink - notifyAll(); - } else { // Another thread got a docID before me, but, it // hasn't finished its processing. So add myself to // the line but don't hold up this thread. - if (numWaiting == waitingThreadStates.length) { - ThreadState[] newWaiting = new ThreadState[2*waitingThreadStates.length]; - System.arraycopy(waitingThreadStates, 0, newWaiting, 0, numWaiting); - waitingThreadStates = newWaiting; - } waitingThreadStates[numWaiting++] = state; } } @@ -3137,3 +3227,12 @@ int posUpto; // Next write address for positions } } + +// Used only internally to DW to call abort "up the stack" +class AbortException extends IOException { + public AbortException(Throwable cause, DocumentsWriter docWriter) { + super(); + initCause(cause); + docWriter.setAborting(); + } +} Index: src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- src/java/org/apache/lucene/index/IndexWriter.java (revision 611407) +++ src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -1278,7 +1278,7 @@ if (!success) { if (infoStream != null) message("hit exception closing doc store segment"); - docWriter.abort(); + docWriter.abort(null); } } @@ -1999,7 +1999,7 @@ segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); - docWriter.abort(); + docWriter.abort(null); // Ask deleter to locate unreferenced files & remove // them: @@ -2401,8 +2401,14 @@ private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { // Make sure no threads are actively adding a document - docWriter.pauseAllThreads(); + // Returns true if docWriter is currently aborting, in + // which case we skip flushing this segment + if (docWriter.pauseAllThreads()) { + docWriter.resumeAllThreads(); + return false; + } + try { SegmentInfo newSegment = null; @@ -2536,7 +2542,7 @@ segmentInfos.remove(segmentInfos.size()-1); } if (flushDocs) - docWriter.abort(); + docWriter.abort(null); deletePartialSegmentsFile(); deleter.checkpoint(segmentInfos, false);