Index: lucene/src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestIndexWriter.java (revision 978806) +++ lucene/src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -2351,7 +2351,7 @@ StackTraceElement[] trace = new Exception().getStackTrace(); for (int i = 0; i < trace.length; i++) { if ("abort".equals(trace[i].getMethodName()) || - "flushDocument".equals(trace[i].getMethodName())) { + "finishDocument".equals(trace[i].getMethodName())) { if (onlyOnce) doFail = false; //System.out.println(Thread.currentThread().getName() + ": now fail"); @@ -2419,7 +2419,7 @@ for(int i=0;i 5); + assertTrue("flush should have occurred and files created", dir.listAll().length > 0); // After rollback, IW should remove all files writer.rollback(); Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (revision 978806) +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (working copy) @@ -189,8 +189,9 @@ threads[i].join(); for(int i=0;i threadsIterator) throws IOException; } + public static abstract class AbortTask { + abstract void abort() throws IOException; + } + protected abstract static class ThreadState { private DocumentsWriterPerThread perThread; private boolean isIdle = true; @@ -95,11 +116,29 @@ return true; } - void abort() throws IOException { - pauseAllThreads(); - aborting = true; - for (ThreadState state : allThreadStates) { - state.perThread.abort(); + void abort(AbortTask task) throws IOException { + lock.lock(); + try { + if (!aborting) { + aborting = true; + pauseAllThreads(); + for (ThreadState state : allThreadStates) { + state.perThread.aborting = true; + } + + try { + for (ThreadState state : allThreadStates) { + state.perThread.abort(); + } + + task.abort(); + } finally { + aborting = false; + resumeAllThreads(); + } + } + } finally { + lock.unlock(); } } @@ -108,7 +147,7 @@ resumeAllThreads(); } - public T executeAllThreads(AllThreadsTask task) throws IOException { + public T executeAllThreads(DocumentsWriter documentsWriter, AllThreadsTask task) throws IOException { T result = null; lock.lock(); @@ -120,19 +159,21 @@ } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } + + assert !globalLock; + globalLock = true; + + pauseAllThreads(); - pauseAllThreads(); - globalLock = true; } finally { lock.unlock(); } + final ThreadState[] localAllThreads = allThreadStates; // all threads are idle now - + boolean success = false; try { - final ThreadState[] localAllThreads = allThreadStates; - result = task.process(new Iterator() { int i = 0; @@ -151,8 +192,18 @@ throw new UnsupportedOperationException("remove() not supported."); } }); + success = true; return result; } finally { + boolean abort = false; + if (!success) { + for (ThreadState state : localAllThreads) { + if (state.perThread.aborting) { + abort = true; + } + } + } + lock.lock(); try { try { @@ -168,6 +219,10 @@ lock.unlock(); } + if (!aborting && abort) { + documentsWriter.abort(); + } + } } @@ -182,13 +237,12 @@ } finally { boolean abort = false; if (!success && state.perThread.aborting) { - state.perThread.aborting = false; abort = true; } returnDocumentsWriterPerThread(state, task.doClearThreadBindings()); - if (abort) { + if (!aborting && abort) { documentsWriter.abort(); } } @@ -222,13 +276,15 @@ ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc); try { - while (!threadState.isIdle || globalLock || aborting) { + while (!threadState.isIdle || globalLock || aborting || threadState.perThread.aborting) { threadStateAvailable.await(); } } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } + assert threadState.isIdle; + threadState.isIdle = false; threadState.start(); Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (working copy) @@ -1,15 +1,32 @@ package org.apache.lucene.index; +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import java.io.IOException; import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.codecs.Codec; import org.apache.lucene.search.Similarity; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ArrayUtil; public class DocumentsWriterPerThread { @@ -84,6 +101,7 @@ * currently buffered docs. This resets our state, * discarding any docs added since last flush. */ void abort() throws IOException { + assert aborting; try { if (infoStream != null) { message("docWriter: now abort"); @@ -124,11 +142,11 @@ SegmentWriteState flushState; long[] sequenceIDs = new long[8]; - final List closedFiles = new ArrayList(); long numBytesUsed; public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) { + parent.indexWriter.testPoint("DocumentsWriterPerThread.init start"); this.directory = directory; this.parent = parent; this.writer = parent.indexWriter; @@ -141,7 +159,6 @@ if (consumer instanceof DocFieldProcessor) { docFieldProcessor = (DocFieldProcessor) consumer; } - } void setAborting() { @@ -288,33 +305,11 @@ return segment; } - @SuppressWarnings("unchecked") - List closedFiles() { - return (List) ((ArrayList) closedFiles).clone(); - } - - - void addOpenFile(String name) { - synchronized(parent.openFiles) { - assert !parent.openFiles.contains(name); - parent.openFiles.add(name); - } - } - - void removeOpenFile(String name) { - synchronized(parent.openFiles) { - assert parent.openFiles.contains(name); - parent.openFiles.remove(name); - } - closedFiles.add(name); - } - void bytesUsed(long numBytes) { ramAllocator.bytesUsed(numBytes); } void message(String message) { - if (infoStream != null) - writer.message("DW: " + message); + writer.message("DW: " + message); } } Index: lucene/src/java/org/apache/lucene/index/TermsHash.java =================================================================== --- lucene/src/java/org/apache/lucene/index/TermsHash.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/TermsHash.java (working copy) @@ -82,9 +82,12 @@ @Override public void abort() { reset(); - consumer.abort(); - if (nextTermsHash != null) { - nextTermsHash.abort(); + try { + consumer.abort(); + } finally { + if (nextTermsHash != null) { + nextTermsHash.abort(); + } } } Index: lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java (working copy) @@ -79,9 +79,6 @@ state.flushedFiles.add(fieldsName); state.flushedFiles.add(fieldsIdxName); - docWriter.removeOpenFile(fieldsName); - docWriter.removeOpenFile(fieldsIdxName); - if (4+((long) state.numDocs)*8 != state.directory.fileLength(fieldsIdxName)) { throw new RuntimeException("after flush: fdx size mismatch: " + state.numDocs + " docs vs " + state.directory.fileLength(fieldsIdxName) + " length in bytes of " + fieldsIdxName + " file exists?=" + state.directory.fileExists(fieldsIdxName)); } @@ -96,8 +93,6 @@ fieldsWriter = new FieldsWriter(docWriter.directory, segment, fieldInfos); - docWriter.addOpenFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.FIELDS_EXTENSION)); - docWriter.addOpenFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.FIELDS_INDEX_EXTENSION)); lastDocID = 0; } } Index: lucene/src/java/org/apache/lucene/index/IndexReaderPool.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexReaderPool.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/IndexReaderPool.java (revision 0) @@ -0,0 +1,263 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.Directory; + +/** Holds shared SegmentReader instances. IndexWriter uses + * SegmentReaders for 1) applying deletes, 2) doing + * merges, 3) handing out a real-time reader. This pool + * reuses instances of the SegmentReaders in all these + * places if it is in "near real-time mode" (getReader() + * has been called on this instance). */ +public class IndexReaderPool { + + private final Map readerMap = new HashMap(); + + private final Directory directory; + private final IndexWriterConfig config; + private final IndexWriter writer; + + public IndexReaderPool(IndexWriter writer, Directory directory, IndexWriterConfig config) { + this.directory = directory; + this.config = config; + this.writer = writer; + } + + /** Forcefully clear changes for the specified segments, + * and remove from the pool. This is called on successful merge. */ + synchronized void clear(SegmentInfos infos) throws IOException { + if (infos == null) { + for (Map.Entry ent: readerMap.entrySet()) { + ent.getValue().hasChanges = false; + } + } else { + for (final SegmentInfo info: infos) { + if (readerMap.containsKey(info)) { + readerMap.get(info).hasChanges = false; + } + } + } + } + + /** + * Release the segment reader (i.e. decRef it and close if there + * are no more references. + * @param sr + * @throws IOException + */ + public synchronized void release(SegmentReader sr) throws IOException { + release(sr, false); + } + + /** + * Release the segment reader (i.e. decRef it and close if there + * are no more references. + * @param sr + * @throws IOException + */ + public synchronized void release(SegmentReader sr, boolean drop) throws IOException { + + final boolean pooled = readerMap.containsKey(sr.getSegmentInfo()); + + assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr; + + // Drop caller's ref; for an external reader (not + // pooled), this decRef will close it + sr.decRef(); + + if (pooled && (drop || (!writer.poolReaders && sr.getRefCount() == 1))) { + + // We are the last ref to this reader; since we're + // not pooling readers, we release it: + readerMap.remove(sr.getSegmentInfo()); + + // nocommit + //assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this); + + // Drop our ref -- this will commit any pending + // changes to the dir + boolean success = false; + try { + sr.close(); + success = true; + } finally { + if (!success && sr.hasChanges) { + // Abandon the changes & retry closing: + sr.hasChanges = false; + try { + sr.close(); + } catch (Throwable ignore) { + // Keep throwing original exception + } + } + } + } + } + + /** Remove all our references to readers, and commits + * any pending changes. */ + synchronized void close() throws IOException { + Iterator> iter = readerMap.entrySet().iterator(); + while (iter.hasNext()) { + + Map.Entry ent = iter.next(); + + SegmentReader sr = ent.getValue(); + if (sr.hasChanges) { + assert writer.infoIsLive(sr.getSegmentInfo()); + sr.startCommit(); + boolean success = false; + try { + sr.doCommit(null); + success = true; + } finally { + if (!success) { + sr.rollbackCommit(); + } + } + } + + iter.remove(); + + // NOTE: it is allowed that this decRef does not + // actually close the SR; this can happen when a + // near real-time reader is kept open after the + // IndexWriter instance is closed + sr.decRef(); + } + } + + /** + * Commit all segment reader in the pool. + * @throws IOException + */ + synchronized void commit() throws IOException { + for (Map.Entry ent : readerMap.entrySet()) { + + SegmentReader sr = ent.getValue(); + if (sr.hasChanges) { + assert writer.infoIsLive(sr.getSegmentInfo()); + sr.startCommit(); + boolean success = false; + try { + sr.doCommit(null); + success = true; + } finally { + if (!success) { + sr.rollbackCommit(); + } + } + } + } + } + + /** + * Returns a ref to a clone. NOTE: this clone is not + * enrolled in the pool, so you should simply close() + * it when you're done (ie, do not call release()). + */ + public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor) throws IOException { + SegmentReader sr = get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, termInfosIndexDivisor); + try { + return (SegmentReader) sr.clone(true); + } finally { + sr.decRef(); + } + } + + /** + * Obtain a SegmentReader from the readerPool. The reader + * must be returned by calling {@link #release(SegmentReader)} + * @see #release(SegmentReader) + * @param info + * @param doOpenStores + * @throws IOException + */ + public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException { + return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, config.getReaderTermsIndexDivisor()); + } + + /** + * Obtain a SegmentReader from the readerPool. The reader + * must be returned by calling {@link #release(SegmentReader)} + * + * @see #release(SegmentReader) + * @param info + * @param doOpenStores + * @param readBufferSize + * @param termsIndexDivisor + * @throws IOException + */ + public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize, int termsIndexDivisor) throws IOException { + + if (writer.poolReaders) { + readBufferSize = BufferedIndexInput.BUFFER_SIZE; + } + + SegmentReader sr = readerMap.get(info); + if (sr == null) { + // TODO: we may want to avoid doing this while + // synchronized + // Returns a ref, which we xfer to readerMap: + sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor, config.getCodecProvider()); + + if (info.dir == directory) { + // Only pool if reader is not external + readerMap.put(info, sr); + } + } else { + if (doOpenStores) { + sr.openDocStores(); + } + if (termsIndexDivisor != -1) { + // If this reader was originally opened because we + // needed to merge it, we didn't load the terms + // index. But now, if the caller wants the terms + // index (eg because it's doing deletes, or an NRT + // reader is being opened) we ask the reader to + // load its terms index. + sr.loadTermsIndex(termsIndexDivisor); + } + } + + // Return a ref to our caller + if (info.dir == directory) { + // Only incRef if we pooled (reader is not external) + sr.incRef(); + } + return sr; + } + + // Returns a ref + public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException { + SegmentReader sr = readerMap.get(info); + if (sr != null) { + sr.incRef(); + } + return sr; + } +} + + Index: lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (working copy) @@ -75,10 +75,6 @@ state.flushedFiles.add(fldName); state.flushedFiles.add(docName); - docWriter.removeOpenFile(idxName); - docWriter.removeOpenFile(fldName); - docWriter.removeOpenFile(docName); - lastDocID = 0; } @@ -105,7 +101,7 @@ } } - void initTermVectorsWriter() throws IOException { + private final void initTermVectorsWriter() throws IOException { if (tvx == null) { final String segment = docWriter.getSegment(); @@ -128,10 +124,6 @@ tvd.writeInt(TermVectorsReader.FORMAT_CURRENT); tvf.writeInt(TermVectorsReader.FORMAT_CURRENT); - docWriter.addOpenFile(idxName); - docWriter.addOpenFile(fldName); - docWriter.addOpenFile(docName); - lastDocID = 0; } } @@ -146,7 +138,8 @@ fill(docState.docID); // Append term vectors to the real outputs: - tvx.writeLong(tvd.getFilePointer()); + long pointer = tvd.getFilePointer(); + tvx.writeLong(pointer); tvx.writeLong(tvf.getFilePointer()); tvd.writeVInt(numVectorFields); if (numVectorFields > 0) { @@ -198,7 +191,7 @@ } lastDocID = 0; - + reset(); } int numVectorFields; Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -2,13 +2,13 @@ import java.io.IOException; import java.io.PrintStream; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; @@ -24,6 +24,8 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.BytesRef; /** @@ -51,7 +53,7 @@ private final DocumentsWriterThreadPool threadPool; private final Lock sequenceIDLock = new ReentrantLock(); - private final Directory directory; + private final Directory openFilesTrackingDirectory; final IndexWriter indexWriter; final IndexWriterConfig config; @@ -69,7 +71,14 @@ private Map minSequenceIDsPerThread = new HashMap(); public DocumentsWriter(Directory directory, IndexWriter indexWriter, IndexWriterConfig config) { - this.directory = directory; + this.openFilesTrackingDirectory = new FilterDirectory(directory) { + @Override public IndexOutput createOutput(final String name) throws IOException { + addOpenFile(name); + return super.createOutput(name); + } + }; + + //this.openFilesTrackingDirectory = directory; this.indexWriter = indexWriter; this.config = config; this.maxBufferedDocs = config.getMaxBufferedDocs(); @@ -111,7 +120,7 @@ } DocumentsWriterPerThread newDocumentsWriterPerThread() { - DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(directory, this, config + DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(openFilesTrackingDirectory, this, config .getIndexingChain()); sequenceIDLock.lock(); try { @@ -127,13 +136,23 @@ return updateDocument(null, doc, analyzer); } + private final static class UpdateResult { + long sequenceID; + boolean flushed; + + UpdateResult(long sequenceID) { + this.sequenceID = sequenceID; + flushed = false; + } + } + long updateDocument(final Term delTerm, final Document doc, final Analyzer analyzer) throws CorruptIndexException, IOException { - long seqID = threadPool.executePerThread(this, doc, - new DocumentsWriterThreadPool.PerThreadTask() { + UpdateResult result = threadPool.executePerThread(this, doc, + new DocumentsWriterThreadPool.PerThreadTask() { @Override - public Long process(final DocumentsWriterPerThread perThread) throws IOException { + public UpdateResult process(final DocumentsWriterPerThread perThread) throws IOException { long perThreadRAMUsedBeforeAdd = perThread.numBytesUsed; perThread.addDocument(doc, analyzer); @@ -154,16 +173,23 @@ sequenceIDLock.unlock(); } + UpdateResult result = new UpdateResult(sequenceID); if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) { + result.flushed = true; super.clearThreadBindings(); } - return sequenceID; + return result; } }); + + if (result == null) { + return -1; + } - indexWriter.maybeMerge(); - - return seqID; + if (result.flushed) { + indexWriter.maybeMerge(); + } + return result.sequenceID; } private final boolean finishAddDocument(DocumentsWriterPerThread perThread, @@ -257,17 +283,12 @@ final boolean flushAllThreads(final boolean flushDeletes) throws IOException { - return threadPool.executeAllThreads(new DocumentsWriterThreadPool.AllThreadsTask() { + + return threadPool.executeAllThreads(this, new DocumentsWriterThreadPool.AllThreadsTask() { @Override public Boolean process(Iterator threadsIterator) throws IOException { boolean anythingFlushed = false; - if (flushDeletes) { - if (applyDeletes(indexWriter.segmentInfos)) { - indexWriter.checkpoint(); - } - } - while (threadsIterator.hasNext()) { DocumentsWriterPerThread perThread = threadsIterator.next(); final int numDocs = perThread.getNumDocsInRAM(); @@ -282,6 +303,7 @@ if (flushDocs) { SegmentInfo newSegment = perThread.flush(); + newSegment.dir = indexWriter.getDirectory(); if (newSegment != null) { anythingFlushed = true; @@ -315,10 +337,8 @@ } /** Build compound file for the segment we just flushed */ - void createCompoundFile(String segment, DocumentsWriterPerThread perThread) throws IOException { - - CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, - IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION)); + void createCompoundFile(String compoundFileName, DocumentsWriterPerThread perThread) throws IOException { + CompoundFileWriter cfsWriter = new CompoundFileWriter(openFilesTrackingDirectory, compoundFileName); for(String fileName : perThread.flushState.flushedFiles) { cfsWriter.addFile(fileName); } @@ -327,49 +347,53 @@ cfsWriter.close(); } - // nocommit void finishFlushedSegment(SegmentInfo newSegment, DocumentsWriterPerThread perThread) throws IOException { - synchronized(indexWriter) { - indexWriter.segmentInfos.add(newSegment); - indexWriter.checkpoint(); + SegmentReader reader = indexWriter.readerPool.get(newSegment, false); + try { + applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs); + } finally { + indexWriter.readerPool.release(reader); + } - SegmentReader reader = indexWriter.readerPool.get(newSegment, false); - boolean any = false; + if (indexWriter.useCompoundFile(newSegment)) { + String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION); + message("creating compound file " + compoundFileName); + // Now build compound file + boolean success = false; try { - any = applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs); + createCompoundFile(compoundFileName, perThread); + success = true; } finally { - indexWriter.readerPool.release(reader); - } - if (any) { - indexWriter.checkpoint(); - } - - if (indexWriter.mergePolicy.useCompoundFile(indexWriter.segmentInfos, newSegment)) { - // Now build compound file - boolean success = false; - try { - createCompoundFile(newSegment.name, perThread); - success = true; - } finally { - if (!success) { - if (infoStream != null) { - message("hit exception " + - "reating compound file for newly flushed segment " + newSegment.name); - } - indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "", - IndexFileNames.COMPOUND_FILE_EXTENSION)); + if (!success) { + if (infoStream != null) { + message("hit exception " + + "reating compound file for newly flushed segment " + newSegment.name); } + indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "", + IndexFileNames.COMPOUND_FILE_EXTENSION)); + for (String file : perThread.flushState.flushedFiles) { + indexWriter.deleter.deleteFile(file); + } + } - - synchronized(indexWriter) { - newSegment.setUseCompoundFile(true); - indexWriter.checkpoint(); - // In case the files we just merged into a CFS were - // not previously checkpointed: - indexWriter.deleter.deleteNewFiles(perThread.closedFiles()); - } } + + for (String file : perThread.flushState.flushedFiles) { + indexWriter.deleter.deleteFile(file); + } + + newSegment.setUseCompoundFile(true); + + synchronized(openFiles) { + openFiles.remove(compoundFileName); + } } + + synchronized(openFiles) { + openFiles.removeAll(perThread.flushState.flushedFiles); + } + + indexWriter.addNewSegment(newSegment); } // Returns true if an abort is in progress @@ -400,6 +424,7 @@ if (perThread.getNumDocsInRAM() == maxBufferedDocs) { flushSegment(perThread); assert perThread.getNumDocsInRAM() == 0; + return true; } @@ -413,48 +438,57 @@ } SegmentInfo newSegment = perThread.flush(); + newSegment.dir = indexWriter.getDirectory(); - if (newSegment != null) { - finishFlushedSegment(newSegment, perThread); - return true; - } - return false; + finishFlushedSegment(newSegment, perThread); + return true; } void abort() throws IOException { - threadPool.abort(); - try { - try { - abortedFiles = openFiles(); - } catch (Throwable t) { - abortedFiles = null; + threadPool.abort(new DocumentsWriterThreadPool.AbortTask() { + + @Override + void abort() throws IOException { + try { + abortedFiles = openFiles(); + } catch (Throwable t) { + abortedFiles = null; + } + + deletesInRAM.clear(); + // nocommit + // deletesFlushed.clear(); + + openFiles.clear(); + deletesInRAM.clear(); } - - deletesInRAM.clear(); - // nocommit - // deletesFlushed.clear(); - - openFiles.clear(); - } finally { - threadPool.finishAbort(); - } - + }); } - final List openFiles = new ArrayList(); + final Set openFiles = new HashSet(); private Collection abortedFiles; // List of files that were written before last abort() - /* * Returns Collection of files in use by this instance, * including any flushed segments. */ @SuppressWarnings("unchecked") - List openFiles() { + private Collection openFiles() { synchronized(openFiles) { - return (List) ((ArrayList) openFiles).clone(); + return (Set) ((HashSet) openFiles).clone(); } } + void addOpenFile(String file) { + synchronized(openFiles) { + openFiles.add(file); + } + } + + void removeOpenFile(String file) { + synchronized(openFiles) { + openFiles.remove(file); + } + } Collection abortedFiles() { return abortedFiles; @@ -480,27 +514,29 @@ // } boolean applyDeletes(SegmentInfos infos) throws IOException { - synchronized(indexWriter) { - if (!hasDeletes()) - return false; - - final long t0 = System.currentTimeMillis(); - - if (infoStream != null) { - message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " + - +infos.size() + " segments."); - } - - final int infosEnd = infos.size(); - - boolean any = false; - for (int i = 0; i < infosEnd; i++) { - - // Make sure we never attempt to apply deletes to - // segment in external dir - assert infos.info(i).dir == directory; - - SegmentInfo si = infos.info(i); + if (!hasDeletes()) + return false; + + final long t0 = System.currentTimeMillis(); + + if (infoStream != null) { + message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " + + +infos.size() + " segments."); + } + + final int infosEnd = infos.size(); + + boolean any = false; + for (int i = 0; i < infosEnd; i++) { + + // Make sure we never attempt to apply deletes to + // segment in external dir + assert infos.info(i).dir == indexWriter.getDirectory(); + + SegmentInfo si = infos.info(i); + // we have to synchronize here, because we need a write lock on + // the segment in order to apply deletes + synchronized (indexWriter) { SegmentReader reader = indexWriter.readerPool.get(si, false); try { any |= applyDeletes(reader, si.getMinSequenceID(), si.getMaxSequenceID(), null); @@ -508,13 +544,13 @@ indexWriter.readerPool.release(reader); } } - - if (infoStream != null) { - message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec"); - } - - return any; } + + if (infoStream != null) { + message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec"); + } + + return any; } // Apply buffered delete terms, queries and docIDs to the @@ -642,9 +678,6 @@ } void message(String message) { - if (infoStream != null) { - indexWriter.message("DW: " + message); - } + indexWriter.message("DW: " + message); } - } Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -271,8 +271,7 @@ volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) volatile long pendingCommitChangeCount; - // nocommit - private - SegmentInfos segmentInfos = new SegmentInfos(); // the segments + private final SegmentInfos segmentInfos = new SegmentInfos(); // the segments private DocumentsWriter docWriter; //nocommit - private @@ -304,7 +303,7 @@ private int flushCount; private int flushDeletesCount; - final ReaderPool readerPool = new ReaderPool(); + final IndexReaderPool readerPool; // This is a "write once" variable (like the organic dye // on a DVD-R that may or may not be heated by a laser and @@ -315,7 +314,7 @@ // reuse SegmentReader instances internally for applying // deletes, doing merges, and reopening near real-time // readers. - private volatile boolean poolReaders; + volatile boolean poolReaders; // The instance that was passed to the constructor. It is saved only in order // to allow users to query an IndexWriter settings. @@ -427,246 +426,21 @@ return r; } } - - /** Holds shared SegmentReader instances. IndexWriter uses - * SegmentReaders for 1) applying deletes, 2) doing - * merges, 3) handing out a real-time reader. This pool - * reuses instances of the SegmentReaders in all these - * places if it is in "near real-time mode" (getReader() - * has been called on this instance). */ - - class ReaderPool { - - private final Map readerMap = new HashMap(); - - /** Forcefully clear changes for the specified segments, - * and remove from the pool. This is called on successful merge. */ - synchronized void clear(SegmentInfos infos) throws IOException { - if (infos == null) { - for (Map.Entry ent: readerMap.entrySet()) { - ent.getValue().hasChanges = false; - } - } else { - for (final SegmentInfo info: infos) { - if (readerMap.containsKey(info)) { - readerMap.get(info).hasChanges = false; - } - } - } + + // used only by asserts + public synchronized boolean infoIsLive(SegmentInfo info) { + int idx = segmentInfos.indexOf(info); + assert idx != -1; + assert segmentInfos.get(idx) == info; + return true; + } + + public synchronized SegmentInfo mapToLive(SegmentInfo info) { + int idx = segmentInfos.indexOf(info); + if (idx != -1) { + info = segmentInfos.get(idx); } - - // used only by asserts - public synchronized boolean infoIsLive(SegmentInfo info) { - int idx = segmentInfos.indexOf(info); - assert idx != -1; - assert segmentInfos.get(idx) == info; - return true; - } - - public synchronized SegmentInfo mapToLive(SegmentInfo info) { - int idx = segmentInfos.indexOf(info); - if (idx != -1) { - info = segmentInfos.get(idx); - } - return info; - } - - /** - * Release the segment reader (i.e. decRef it and close if there - * are no more references. - * @param sr - * @throws IOException - */ - public synchronized void release(SegmentReader sr) throws IOException { - release(sr, false); - } - - /** - * Release the segment reader (i.e. decRef it and close if there - * are no more references. - * @param sr - * @throws IOException - */ - public synchronized void release(SegmentReader sr, boolean drop) throws IOException { - - final boolean pooled = readerMap.containsKey(sr.getSegmentInfo()); - - assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr; - - // Drop caller's ref; for an external reader (not - // pooled), this decRef will close it - sr.decRef(); - - if (pooled && (drop || (!poolReaders && sr.getRefCount() == 1))) { - - // We are the last ref to this reader; since we're - // not pooling readers, we release it: - readerMap.remove(sr.getSegmentInfo()); - - assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this); - - // Drop our ref -- this will commit any pending - // changes to the dir - boolean success = false; - try { - sr.close(); - success = true; - } finally { - if (!success && sr.hasChanges) { - // Abandon the changes & retry closing: - sr.hasChanges = false; - try { - sr.close(); - } catch (Throwable ignore) { - // Keep throwing original exception - } - } - } - } - } - - /** Remove all our references to readers, and commits - * any pending changes. */ - synchronized void close() throws IOException { - Iterator> iter = readerMap.entrySet().iterator(); - while (iter.hasNext()) { - - Map.Entry ent = iter.next(); - - SegmentReader sr = ent.getValue(); - if (sr.hasChanges) { - assert infoIsLive(sr.getSegmentInfo()); - sr.startCommit(); - boolean success = false; - try { - sr.doCommit(null); - success = true; - } finally { - if (!success) { - sr.rollbackCommit(); - } - } - } - - iter.remove(); - - // NOTE: it is allowed that this decRef does not - // actually close the SR; this can happen when a - // near real-time reader is kept open after the - // IndexWriter instance is closed - sr.decRef(); - } - } - - /** - * Commit all segment reader in the pool. - * @throws IOException - */ - synchronized void commit() throws IOException { - for (Map.Entry ent : readerMap.entrySet()) { - - SegmentReader sr = ent.getValue(); - if (sr.hasChanges) { - assert infoIsLive(sr.getSegmentInfo()); - sr.startCommit(); - boolean success = false; - try { - sr.doCommit(null); - success = true; - } finally { - if (!success) { - sr.rollbackCommit(); - } - } - } - } - } - - /** - * Returns a ref to a clone. NOTE: this clone is not - * enrolled in the pool, so you should simply close() - * it when you're done (ie, do not call release()). - */ - public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor) throws IOException { - SegmentReader sr = get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, termInfosIndexDivisor); - try { - return (SegmentReader) sr.clone(true); - } finally { - sr.decRef(); - } - } - - /** - * Obtain a SegmentReader from the readerPool. The reader - * must be returned by calling {@link #release(SegmentReader)} - * @see #release(SegmentReader) - * @param info - * @param doOpenStores - * @throws IOException - */ - public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException { - return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, config.getReaderTermsIndexDivisor()); - } - - /** - * Obtain a SegmentReader from the readerPool. The reader - * must be returned by calling {@link #release(SegmentReader)} - * - * @see #release(SegmentReader) - * @param info - * @param doOpenStores - * @param readBufferSize - * @param termsIndexDivisor - * @throws IOException - */ - public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize, int termsIndexDivisor) throws IOException { - - if (poolReaders) { - readBufferSize = BufferedIndexInput.BUFFER_SIZE; - } - - SegmentReader sr = readerMap.get(info); - if (sr == null) { - // TODO: we may want to avoid doing this while - // synchronized - // Returns a ref, which we xfer to readerMap: - sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor, codecs); - - if (info.dir == directory) { - // Only pool if reader is not external - readerMap.put(info, sr); - } - } else { - if (doOpenStores) { - sr.openDocStores(); - } - if (termsIndexDivisor != -1) { - // If this reader was originally opened because we - // needed to merge it, we didn't load the terms - // index. But now, if the caller wants the terms - // index (eg because it's doing deletes, or an NRT - // reader is being opened) we ask the reader to - // load its terms index. - sr.loadTermsIndex(termsIndexDivisor); - } - } - - // Return a ref to our caller - if (info.dir == directory) { - // Only incRef if we pooled (reader is not external) - sr.incRef(); - } - return sr; - } - - // Returns a ref - public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException { - SegmentReader sr = readerMap.get(info); - if (sr != null) { - sr.incRef(); - } - return sr; - } + return info; } /** @@ -934,6 +708,8 @@ poolReaders = conf.getReaderPooling(); + this.readerPool = new IndexReaderPool(this, directory, config); + OpenMode mode = conf.getOpenMode(); boolean create; if (mode == OpenMode.CREATE) { @@ -1784,7 +1560,7 @@ if (infoStream != null) { message("hit exception updating document"); } - + synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here @@ -2419,6 +2195,15 @@ deleter.checkpoint(segmentInfos, false); } + synchronized void addNewSegment(SegmentInfo newSegment) throws IOException { + segmentInfos.add(newSegment); + checkpoint(); + } + + boolean useCompoundFile(SegmentInfo segmentInfo) { + return mergePolicy.useCompoundFile(segmentInfos, segmentInfo); + } + private synchronized void resetMergeExceptions() { mergeExceptions = new ArrayList(); mergeGen++; @@ -2793,7 +2578,7 @@ if (pendingCommit != null) { try { if (infoStream != null) - message("commit: pendingCommit != null"); + message("commit: pendingCommit != null"); pendingCommit.finishCommit(directory); if (infoStream != null) message("commit: wrote segments file \"" + pendingCommit.getCurrentSegmentFileName() + "\""); @@ -2828,27 +2613,39 @@ protected final void flush(boolean triggerMerge, boolean flushDeletes) throws CorruptIndexException, IOException { // We can be called during close, when closing==true, so we must pass false to ensureOpen: ensureOpen(false); - if (doFlush(flushDeletes) && triggerMerge) + + doBeforeFlush(); + + if (flushDeletes) { + if (applyDeletes()) { + checkpoint(); + } + } + boolean maybeMerge = false; + boolean success = false; + try { + maybeMerge = docWriter.flushAllThreads(flushDeletes) && triggerMerge; + success = true; + } finally { + if (!success) { + synchronized (this) { + // If docWriter has some aborted files that were + // never incref'd, then we clean them up here + final Collection files = docWriter.abortedFiles(); + if (files != null) { + deleter.deleteNewFiles(files); + } + } + } + } + + doAfterFlush(); + + if (maybeMerge) { maybeMerge(); + } } - // TODO: this method should not have to be entirely - // synchronized, ie, merges should be allowed to commit - // even while a flush is happening - private synchronized final boolean doFlush(boolean flushDeletes) throws CorruptIndexException, IOException { - return docWriter.flushAllThreads(flushDeletes); - // nocommit -// try { -// try { -// return doFlushInternal(flushDocStores, flushDeletes); -// } finally { -// docWriter.balanceRAM(); -// } -// } finally { -// docWriter.clearFlushPending(); -// } - } - /** Expert: Return the total size of all index files currently cached in memory. * Useful for size management with flushRamDocs() */ Index: lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 978806) +++ lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Set; import java.util.List; import java.util.Map; @@ -422,7 +423,7 @@ public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException { if (infoStream != null) { - message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]"); + message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]"); } // Try again now to delete any previously un-deletable @@ -442,18 +443,6 @@ // Decref files for commits that were deleted by the policy: deleteCommits(); } else { - - final List docWriterFiles; - if (docWriter != null) { - docWriterFiles = docWriter.openFiles(); - if (docWriterFiles != null) - // We must incRef these files before decRef'ing - // last files to make sure we don't accidentally - // delete them: - incRef(docWriterFiles); - } else - docWriterFiles = null; - // DecRef old files from the last checkpoint, if any: int size = lastFiles.size(); if (size > 0) { @@ -465,8 +454,6 @@ // Save files so we can decr on next checkpoint/commit: lastFiles.add(segmentInfos.files(directory, false)); - if (docWriterFiles != null) - lastFiles.add(docWriterFiles); } } Index: lucene/src/java/org/apache/lucene/store/FilterDirectory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/FilterDirectory.java (revision 0) +++ lucene/src/java/org/apache/lucene/store/FilterDirectory.java (revision 0) @@ -0,0 +1,111 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Collection; + +public abstract class FilterDirectory extends Directory { + private final Directory delegate; + + public FilterDirectory(Directory delegate) { + this.delegate = delegate; + } + + @Override + public String[] listAll() throws IOException { + return delegate.listAll(); + } + + @Override + public boolean fileExists(String name) throws IOException { + return delegate.fileExists(name); + } + + @Override + public long fileModified(String name) throws IOException { + return delegate.fileModified(name); + } + + @Override + public void touchFile(String name) throws IOException { + delegate.touchFile(name); + } + + @Override + public void deleteFile(String name) throws IOException { + delegate.deleteFile(name); + } + + @Override + public long fileLength(String name) throws IOException { + return delegate.fileLength(name); + } + + @Override + public IndexOutput createOutput(String name) throws IOException { + return delegate.createOutput(name); + } + + @Override + public IndexInput openInput(String name) throws IOException { + return delegate.openInput(name); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Deprecated @Override + public void sync(String name) throws IOException { // TODO 4.0 kill me + delegate.sync(name); + } + + public void sync(Collection names) throws IOException { // TODO 4.0 make me abstract + delegate.sync(names); + } + + public IndexInput openInput(String name, int bufferSize) throws IOException { + return delegate.openInput(name, bufferSize); + } + + public Lock makeLock(String name) { + return delegate.makeLock(name); + } + + public void clearLock(String name) throws IOException { + delegate.clearLock(name); + } + + public void setLockFactory(LockFactory lockFactory) { + delegate.setLockFactory(lockFactory); + } + + public LockFactory getLockFactory() { + return delegate.getLockFactory(); + } + + public String getLockID() { + return delegate.getLockID(); + } + + public void copy(Directory to, String src, String dest) throws IOException { + delegate.copy(to, src, dest); + } +}