Index: lucene/src/java/org/apache/lucene/index/BufferedDeletes.java --- lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Thu Dec 09 05:56:34 2010 -0500 @@ -17,153 +17,415 @@ * limitations under the License. */ +import java.io.IOException; +import java.io.PrintStream; import java.util.HashMap; +import java.util.Date; +import java.util.Map.Entry; import java.util.Map; -import java.util.TreeMap; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; -/** Holds buffered deletes, by docID, term or query. We - * hold two instances of this class: one for the deletes - * prior to the last flush, the other for deletes after - * the last flush. This is so if we need to abort - * (discard all buffered docs) we can also discard the - * buffered deletes yet keep the deletes done during - * previously flushed segments. */ +/** Holds a {@link SegmentDeletes} for each segment in the + * index. */ + class BufferedDeletes { - int numTerms; - Map terms; - Map queries = new HashMap(); - List docIDs = new ArrayList(); - long bytesUsed; - private final boolean doTermSort; - public BufferedDeletes(boolean doTermSort) { - this.doTermSort = doTermSort; - if (doTermSort) { - terms = new TreeMap(); + // Deletes for all flushed/merged segments: + private final Map deletesMap = new HashMap(); + + // used only by assert + private Term lastDeleteTerm; + + private PrintStream infoStream; + private final AtomicLong bytesUsed = new AtomicLong(); + private final AtomicInteger numTerms = new AtomicInteger(); + private final int messageID; + + public BufferedDeletes(int messageID) { + this.messageID = messageID; + } + + private synchronized void message(String message) { + if (infoStream != null) { + infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: BD " + message); + } + } + + public synchronized void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + } + + public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info) { + pushDeletes(newDeletes, info, false); + } + + // Moves all pending deletes onto the provided segment, + // then clears the pending deletes + public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info, boolean noLimit) { + assert newDeletes.any(); + numTerms.addAndGet(newDeletes.numTermDeletes.get()); + + if (!noLimit) { + assert !deletesMap.containsKey(info); + assert info != null; + deletesMap.put(info, newDeletes); + bytesUsed.addAndGet(newDeletes.bytesUsed.get()); } else { - terms = new HashMap(); + final SegmentDeletes deletes = getDeletes(info); + bytesUsed.addAndGet(-deletes.bytesUsed.get()); + deletes.update(newDeletes, noLimit); + bytesUsed.addAndGet(deletes.bytesUsed.get()); + } + if (infoStream != null) { + message("push deletes seg=" + info + " dels=" + getDeletes(info)); + } + assert checkDeleteStats(); + } + + public synchronized void clear() { + deletesMap.clear(); + numTerms.set(0); + bytesUsed.set(0); + } + + synchronized boolean any() { + return bytesUsed.get() != 0; + } + + public int numTerms() { + return numTerms.get(); + } + + public long bytesUsed() { + return bytesUsed.get(); + } + + // IW calls this on finishing a merge. While the merge + // was running, it's possible new deletes were pushed onto + // our last (and only our last) segment. In this case we + // must carry forward those deletes onto the merged + // segment. + synchronized void commitMerge(MergePolicy.OneMerge merge) { + assert checkDeleteStats(); + if (infoStream != null) { + message("commitMerge merge.info=" + merge.info + " merge.segments=" + merge.segments); + } + final SegmentInfo lastInfo = merge.segments.lastElement(); + final SegmentDeletes lastDeletes = deletesMap.get(lastInfo); + if (lastDeletes != null) { + deletesMap.remove(lastInfo); + assert !deletesMap.containsKey(merge.info); + deletesMap.put(merge.info, lastDeletes); + // don't need to update numTerms/bytesUsed since we + // are just moving the deletes from one info to + // another + if (infoStream != null) { + message("commitMerge done: new deletions=" + lastDeletes); + } + } else if (infoStream != null) { + message("commitMerge done: no new deletions"); + } + assert !anyDeletes(merge.segments.range(0, merge.segments.size()-1)); + assert checkDeleteStats(); + } + + synchronized void clear(SegmentDeletes deletes) { + deletes.clear(); + } + + public synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos segmentInfos, SegmentInfos applyInfos) throws IOException { + if (!any()) { + return false; + } + final long t0 = System.currentTimeMillis(); + + if (infoStream != null) { + message("applyDeletes: applyInfos=" + applyInfos + "; index=" + segmentInfos); + } + + assert checkDeleteStats(); + + assert applyInfos.size() > 0; + + boolean any = false; + + final SegmentInfo lastApplyInfo = applyInfos.lastElement(); + final int lastIdx = segmentInfos.indexOf(lastApplyInfo); + + final SegmentInfo firstInfo = applyInfos.firstElement(); + final int firstIdx = segmentInfos.indexOf(firstInfo); + + // applyInfos must be a slice of segmentInfos + assert lastIdx - firstIdx + 1 == applyInfos.size(); + + // iterate over all segment infos backwards + // coalesceing deletes along the way + // when we're at or below the last of the + // segments to apply to, start applying the deletes + // we traverse up to the first apply infos + SegmentDeletes coalescedDeletes = null; + boolean hasDeletes = false; + for (int segIdx=segmentInfos.size()-1; segIdx >= firstIdx; segIdx--) { + final SegmentInfo info = segmentInfos.info(segIdx); + final SegmentDeletes deletes = deletesMap.get(info); + assert deletes == null || deletes.any(); + + if (deletes == null && coalescedDeletes == null) { + continue; + } + + if (infoStream != null) { + message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]"); + } + + hasDeletes |= deletes != null; + + if (segIdx <= lastIdx && hasDeletes) { + + any |= applyDeletes(readerPool, info, coalescedDeletes, deletes); + + if (deletes != null) { + // we've applied doc ids, and they're only applied + // on the current segment + bytesUsed.addAndGet(-deletes.docIDs.size() * SegmentDeletes.BYTES_PER_DEL_DOCID); + deletes.clearDocIDs(); + } + } + + // now coalesce at the max limit + if (deletes != null) { + if (coalescedDeletes == null) { + coalescedDeletes = new SegmentDeletes(); + } + // TODO: we could make this single pass (coalesce as + // we apply the deletes + coalescedDeletes.update(deletes, true); + } + } + + // move all deletes to segment just before our merge. + if (firstIdx > 0) { + + SegmentDeletes mergedDeletes = null; + // TODO: we could also make this single pass + for (SegmentInfo info : applyInfos) { + final SegmentDeletes deletes = deletesMap.get(info); + if (deletes != null) { + assert deletes.any(); + if (mergedDeletes == null) { + mergedDeletes = getDeletes(segmentInfos.info(firstIdx-1)); + numTerms.addAndGet(-mergedDeletes.numTermDeletes.get()); + bytesUsed.addAndGet(-mergedDeletes.bytesUsed.get()); + } + + mergedDeletes.update(deletes, true); + } + } + + if (mergedDeletes != null) { + numTerms.addAndGet(mergedDeletes.numTermDeletes.get()); + bytesUsed.addAndGet(mergedDeletes.bytesUsed.get()); + } + + if (infoStream != null) { + if (mergedDeletes != null) { + message("applyDeletes: merge all deletes into seg=" + segmentInfos.info(firstIdx-1) + ": " + mergedDeletes); + } else { + message("applyDeletes: no deletes to merge"); + } + } + } else { + // We drop the deletes in this case, because we've + // applied them to segment infos starting w/ the first + // segment. There are no prior segments so there's no + // reason to keep them around. When the applyInfos == + // segmentInfos this means all deletes have been + // removed: + } + remove(applyInfos); + + assert checkDeleteStats(); + assert applyInfos != segmentInfos || !any(); + + if (infoStream != null) { + message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec"); + } + return any; + } + + private synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool, + SegmentInfo info, + SegmentDeletes coalescedDeletes, + SegmentDeletes segmentDeletes) throws IOException { + assert readerPool.infoIsLive(info); + + assert coalescedDeletes == null || coalescedDeletes.docIDs.size() == 0; + + boolean any = false; + + // Lock order: IW -> BD -> RP + SegmentReader reader = readerPool.get(info, false); + try { + if (coalescedDeletes != null) { + any |= applyDeletes(coalescedDeletes, reader); + } + if (segmentDeletes != null) { + any |= applyDeletes(segmentDeletes, reader); + } + } finally { + readerPool.release(reader); + } + return any; + } + + private synchronized boolean applyDeletes(SegmentDeletes deletes, SegmentReader reader) throws IOException { + boolean any = false; + + assert checkDeleteTerm(null); + + if (deletes.terms.size() > 0) { + Fields fields = reader.fields(); + if (fields == null) { + // This reader has no postings + return false; + } + + TermsEnum termsEnum = null; + + String currentField = null; + DocsEnum docs = null; + + for (Entry entry: deletes.terms.entrySet()) { + Term term = entry.getKey(); + // Since we visit terms sorted, we gain performance + // by re-using the same TermsEnum and seeking only + // forwards + if (term.field() != currentField) { + assert currentField == null || currentField.compareTo(term.field()) < 0; + currentField = term.field(); + Terms terms = fields.terms(currentField); + if (terms != null) { + termsEnum = terms.iterator(); + } else { + termsEnum = null; + } + } + + if (termsEnum == null) { + continue; + } + assert checkDeleteTerm(term); + + if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) { + DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs); + + if (docsEnum != null) { + docs = docsEnum; + final int limit = entry.getValue(); + while (true) { + final int docID = docs.nextDoc(); + if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) { + break; + } + reader.deleteDocument(docID); + any = true; + } + } + } + } + } + + // Delete by docID + for (Integer docIdInt : deletes.docIDs) { + int docID = docIdInt.intValue(); + reader.deleteDocument(docID); + any = true; + } + + // Delete by query + if (deletes.queries.size() > 0) { + IndexSearcher searcher = new IndexSearcher(reader); + try { + for (Entry entry : deletes.queries.entrySet()) { + Query query = entry.getKey(); + int limit = entry.getValue().intValue(); + Weight weight = query.weight(searcher); + Scorer scorer = weight.scorer(reader, true, false); + if (scorer != null) { + while(true) { + int doc = scorer.nextDoc(); + if (doc >= limit) + break; + reader.deleteDocument(doc); + any = true; + } + } + } + } finally { + searcher.close(); + } + } + return any; + } + + public synchronized SegmentDeletes getDeletes(SegmentInfo info) { + SegmentDeletes deletes = deletesMap.get(info); + if (deletes == null) { + deletes = new SegmentDeletes(); + deletesMap.put(info, deletes); + } + return deletes; + } + + public synchronized void remove(SegmentInfos infos) { + assert infos.size() > 0; + for (SegmentInfo info : infos) { + SegmentDeletes deletes = deletesMap.get(info); + if (deletes != null) { + bytesUsed.addAndGet(-deletes.bytesUsed.get()); + assert bytesUsed.get() >= 0: "bytesUsed=" + bytesUsed; + numTerms.addAndGet(-deletes.numTermDeletes.get()); + assert numTerms.get() >= 0: "numTerms=" + numTerms; + deletesMap.remove(info); + } } } - // Number of documents a delete term applies to. - final static class Num { - private int num; - - Num(int num) { - this.num = num; + // used only by assert + private boolean anyDeletes(SegmentInfos infos) { + for(SegmentInfo info : infos) { + if (deletesMap.containsKey(info)) { + return true; + } } - - int getNum() { - return num; - } - - void setNum(int num) { - // Only record the new number if it's greater than the - // current one. This is important because if multiple - // threads are replacing the same doc at nearly the - // same time, it's possible that one thread that got a - // higher docID is scheduled before the other - // threads. - if (num > this.num) - this.num = num; - } + return false; } - int size() { - // We use numTerms not terms.size() intentionally, so - // that deletes by the same term multiple times "count", - // ie if you ask to flush every 1000 deletes then even - // dup'd terms are counted towards that 1000 - return numTerms + queries.size() + docIDs.size(); + // used only by assert + private boolean checkDeleteTerm(Term term) { + if (term != null) { + assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; + } + lastDeleteTerm = term; + return true; } - - void update(BufferedDeletes in) { - numTerms += in.numTerms; - bytesUsed += in.bytesUsed; - terms.putAll(in.terms); - queries.putAll(in.queries); - docIDs.addAll(in.docIDs); - in.clear(); + + // only for assert + private boolean checkDeleteStats() { + int numTerms2 = 0; + long bytesUsed2 = 0; + for(SegmentDeletes deletes : deletesMap.values()) { + numTerms2 += deletes.numTermDeletes.get(); + bytesUsed2 += deletes.bytesUsed.get(); + } + assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get(); + assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed; + return true; } - - void clear() { - terms.clear(); - queries.clear(); - docIDs.clear(); - numTerms = 0; - bytesUsed = 0; - } - - void addBytesUsed(long b) { - bytesUsed += b; - } - - boolean any() { - return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0; - } - - // Remaps all buffered deletes based on a completed - // merge - synchronized void remap(MergeDocIDRemapper mapper, - SegmentInfos infos, - int[][] docMaps, - int[] delCounts, - MergePolicy.OneMerge merge, - int mergeDocCount) { - - final Map newDeleteTerms; - - // Remap delete-by-term - if (terms.size() > 0) { - if (doTermSort) { - newDeleteTerms = new TreeMap(); - } else { - newDeleteTerms = new HashMap(); - } - for(Entry entry : terms.entrySet()) { - Num num = entry.getValue(); - newDeleteTerms.put(entry.getKey(), - new Num(mapper.remap(num.getNum()))); - } - } else - newDeleteTerms = null; - - - // Remap delete-by-docID - final List newDeleteDocIDs; - - if (docIDs.size() > 0) { - newDeleteDocIDs = new ArrayList(docIDs.size()); - for (Integer num : docIDs) { - newDeleteDocIDs.add(Integer.valueOf(mapper.remap(num.intValue()))); - } - } else - newDeleteDocIDs = null; - - - // Remap delete-by-query - final HashMap newDeleteQueries; - - if (queries.size() > 0) { - newDeleteQueries = new HashMap(queries.size()); - for(Entry entry: queries.entrySet()) { - Integer num = entry.getValue(); - newDeleteQueries.put(entry.getKey(), - Integer.valueOf(mapper.remap(num.intValue()))); - } - } else - newDeleteQueries = null; - - if (newDeleteTerms != null) - terms = newDeleteTerms; - if (newDeleteDocIDs != null) - docIDs = newDeleteDocIDs; - if (newDeleteQueries != null) - queries = newDeleteQueries; - } -} \ No newline at end of file +} Index: lucene/src/java/org/apache/lucene/index/CompoundFileReader.java --- lucene/src/java/org/apache/lucene/index/CompoundFileReader.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/CompoundFileReader.java Thu Dec 09 05:56:34 2010 -0500 @@ -160,7 +160,7 @@ id = IndexFileNames.stripSegmentName(id); FileEntry entry = entries.get(id); if (entry == null) - throw new IOException("No sub-file with id " + id + " found"); + throw new IOException("No sub-file with id " + id + " found (files: " + entries.keySet() + ")"); return new CSIndexInput(stream, entry.offset, entry.length, readBufferSize); } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Dec 09 05:56:34 2010 -0500 @@ -23,24 +23,18 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Map; import java.util.HashSet; import java.util.List; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Similarity; -import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMFile; import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.Constants; import org.apache.lucene.util.RecyclingByteBlockAllocator; import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.RamUsageEstimator; @@ -115,7 +109,6 @@ */ final class DocumentsWriter { - final AtomicLong bytesUsed = new AtomicLong(0); IndexWriter writer; Directory directory; @@ -133,9 +126,6 @@ private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; private final HashMap threadBindings = new HashMap(); - private int pauseThreads; // Non-zero when we need all threads to - // pause (eg to flush) - boolean flushPending; // True when a thread has decided to flush boolean bufferIsFull; // True when it's time to write segment private boolean aborting; // True if an abort is pending @@ -151,6 +141,9 @@ List newFiles; + // Deletes for our still-in-RAM (to be flushed next) segment + private SegmentDeletes pendingDeletes = new SegmentDeletes(); + static class DocState { DocumentsWriter docWriter; Analyzer analyzer; @@ -276,18 +269,6 @@ final DocConsumer consumer; - // Deletes done after the last flush; these are discarded - // on abort - private BufferedDeletes deletesInRAM = new BufferedDeletes(false); - - // Deletes done before the last flush; these are still - // kept on abort - private BufferedDeletes deletesFlushed = new BufferedDeletes(true); - - // The max number of delete terms that can be buffered before - // they must be flushed to disk. - private int maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS; - // How much RAM we can use before flushing. This is 0 if // we are flushing by doc count instead. private long ramBufferSize = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); @@ -302,28 +283,20 @@ // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS; - private int flushedDocCount; // How many docs already flushed to index - - synchronized void updateFlushedDocCount(int n) { - flushedDocCount += n; - } - synchronized int getFlushedDocCount() { - return flushedDocCount; - } - synchronized void setFlushedDocCount(int n) { - flushedDocCount = n; - } - private boolean closed; private final FieldInfos fieldInfos; - DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates, FieldInfos fieldInfos) throws IOException { + private final BufferedDeletes bufferedDeletes; + private final IndexWriter.FlushControl flushControl; + + DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException { this.directory = directory; this.writer = writer; this.similarity = writer.getConfig().getSimilarity(); this.maxThreadStates = maxThreadStates; - flushedDocCount = writer.maxDoc(); this.fieldInfos = fieldInfos; + this.bufferedDeletes = bufferedDeletes; + flushControl = writer.flushControl; consumer = indexingChain.getChain(this); if (consumer instanceof DocFieldProcessor) { @@ -331,6 +304,57 @@ } } + // Buffer a specific docID for deletion. Currently only + // used when we hit a exception when adding a document + synchronized void deleteDocID(int docIDUpto) { + pendingDeletes.addDocID(docIDUpto); + // NOTE: we do not trigger flush here. This is + // potentially a RAM leak, if you have an app that tries + // to add docs but every single doc always hits a + // non-aborting exception. Allowing a flush here gets + // very messy because we are only invoked when handling + // exceptions so to do this properly, while handling an + // exception we'd have to go off and flush new deletes + // which is risky (likely would hit some other + // confounding exception). + } + + boolean deleteQueries(Query... queries) { + final boolean doFlush = flushControl.waitUpdate(0, queries.length); + synchronized(this) { + for (Query query : queries) { + pendingDeletes.addQuery(query, numDocsInRAM); + } + } + return doFlush; + } + + boolean deleteQuery(Query query) { + final boolean doFlush = flushControl.waitUpdate(0, 1); + synchronized(this) { + pendingDeletes.addQuery(query, numDocsInRAM); + } + return doFlush; + } + + boolean deleteTerms(Term... terms) { + final boolean doFlush = flushControl.waitUpdate(0, terms.length); + synchronized(this) { + for (Term term : terms) { + pendingDeletes.addTerm(term, numDocsInRAM); + } + } + return doFlush; + } + + boolean deleteTerm(Term term, boolean skipWait) { + final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait); + synchronized(this) { + pendingDeletes.addTerm(term, numDocsInRAM); + } + return doFlush; + } + public FieldInfos getFieldInfos() { return fieldInfos; } @@ -395,12 +419,12 @@ } /** Get current segment name we are writing. */ - String getSegment() { + synchronized String getSegment() { return segment; } /** Returns how many docs are currently buffered in RAM. */ - int getNumDocsInRAM() { + synchronized int getNumDocsInRAM() { return numDocsInRAM; } @@ -412,46 +436,86 @@ /** Returns the doc offset into the shared doc store for * the current buffered docs. */ - int getDocStoreOffset() { + synchronized int getDocStoreOffset() { return docStoreOffset; } - /** Closes the current open doc stores an returns the doc - * store segment name. This returns null if there are * - * no buffered documents. */ - synchronized String closeDocStore() throws IOException { + /** Closes the current open doc stores an sets the + * docStoreSegment and docStoreUseCFS on the provided + * SegmentInfo. */ + synchronized void closeDocStore(SegmentWriteState flushState, IndexWriter writer, IndexFileDeleter deleter, SegmentInfo newSegment, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException { - assert allThreadsIdle(); + final boolean isSeparate = numDocsInRAM == 0 || !segment.equals(docStoreSegment); - if (infoStream != null) - message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore); - - boolean success = false; + assert docStoreSegment != null; - try { - initFlushState(true); - closedFiles.clear(); + if (infoStream != null) { + message("closeDocStore: files=" + openFiles + "; segment=" + docStoreSegment + "; docStoreOffset=" + docStoreOffset + "; numDocsInStore=" + numDocsInStore + "; isSeparate=" + isSeparate); + } - consumer.closeDocStore(flushState); - assert 0 == openFiles.size(); + closedFiles.clear(); + consumer.closeDocStore(flushState); + flushState.numDocsInStore = 0; + assert 0 == openFiles.size(); - String s = docStoreSegment; - docStoreSegment = null; - docStoreOffset = 0; - numDocsInStore = 0; - success = true; - return s; - } finally { - if (!success) { - abort(); + if (isSeparate) { + flushState.flushedFiles.clear(); + + if (mergePolicy.useCompoundDocStore(segmentInfos)) { + + final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + + if (infoStream != null) { + message("closeDocStore: create compound file " + compoundFileName); + } + + boolean success = false; + try { + + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); + for (final String file : closedFiles) { + cfsWriter.addFile(file); + } + + // Perform the merge + cfsWriter.close(); + + success = true; + } finally { + if (!success) { + deleter.deleteFile(compoundFileName); + } + } + + // In case the files we just merged into a CFS were + // not registered w/ IFD: + deleter.deleteNewFiles(closedFiles); + + final int numSegments = segmentInfos.size(); + for(int i=0;i closedFiles() { - return (List) ((ArrayList) closedFiles).clone(); - } - synchronized void addOpenFile(String name) { assert !openFiles.contains(name); openFiles.add(name); @@ -488,6 +547,9 @@ } synchronized void setAborting() { + if (infoStream != null) { + message("setAborting"); + } aborting = true; } @@ -497,61 +559,62 @@ * discarding any docs added since last flush. */ synchronized void abort() throws IOException { + if (infoStream != null) { + message("docWriter: abort"); + } + + boolean success = false; + try { - if (infoStream != null) { - message("docWriter: now abort"); - } // Forcefully remove waiting ThreadStates from line waitQueue.abort(); // Wait for all other threads to finish with // DocumentsWriter: - pauseAllThreads(); + waitIdle(); + + if (infoStream != null) { + message("docWriter: abort waitIdle done"); + } + + assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting; + + waitQueue.waitingBytes = 0; try { + abortedFiles = openFiles(); + } catch (Throwable t) { + abortedFiles = null; + } - assert 0 == waitQueue.numWaiting; + pendingDeletes.clear(); + + openFiles.clear(); - waitQueue.waitingBytes = 0; - + for(int i=0;i 0) { + if (infoStream != null) { + message("flush: push buffered deletes to previously flushed segment " + segmentInfos.lastElement()); + } + bufferedDeletes.pushDeletes(pendingDeletes, segmentInfos.lastElement(), true); + } else { + if (infoStream != null) { + message("flush: drop buffered deletes: no segments"); + } + // We can safely discard these deletes: since + // there are no segments, the deletions cannot + // affect anything. + } + pendingDeletes = new SegmentDeletes(); + } } - + + public boolean anyDeletions() { + return pendingDeletes.any(); + } + /** Flush all pending docs to a new segment */ - synchronized int flush(boolean closeDocStore) throws IOException { + // Lock order: IW -> DW + synchronized SegmentInfo flush(IndexWriter writer, boolean closeDocStore, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException { - assert allThreadsIdle(); + // We change writer's segmentInfos: + assert Thread.holdsLock(writer); - assert numDocsInRAM > 0; + waitIdle(); - assert nextDocID == numDocsInRAM; - assert waitQueue.numWaiting == 0; - assert waitQueue.waitingBytes == 0; + if (numDocsInRAM == 0 && numDocsInStore == 0) { + // nothing to do! + if (infoStream != null) { + message("flush: no docs; skipping"); + } + // Lock order: IW -> DW -> BD + pushDeletes(null, segmentInfos); + return null; + } - initFlushState(false); + if (aborting) { + if (infoStream != null) { + message("flush: skip because aborting is set"); + } + return null; + } - docStoreOffset = numDocsInStore; + boolean success = false; - if (infoStream != null) - message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); - - boolean success = false; + SegmentInfo newSegment; try { - if (closeDocStore) { - assert flushState.docStoreSegmentName != null; - assert flushState.docStoreSegmentName.equals(flushState.segmentName); - closeDocStore(); - flushState.numDocsInStore = 0; + assert waitQueue.waitingBytes == 0; + + assert docStoreSegment != null || numDocsInRAM == 0: "dss=" + docStoreSegment + " numDocsInRAM=" + numDocsInRAM; + + assert numDocsInStore >= numDocsInRAM: "numDocsInStore=" + numDocsInStore + " numDocsInRAM=" + numDocsInRAM; + + final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos, + docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(), + SegmentCodecs.build(fieldInfos, writer.codecs)); + + newSegment = new SegmentInfo(segment, numDocsInRAM, directory, false, -1, null, false, hasProx(), flushState.segmentCodecs); + + if (!closeDocStore || docStoreOffset != 0) { + newSegment.setDocStoreSegment(docStoreSegment); + newSegment.setDocStoreOffset(docStoreOffset); } - Collection threads = new HashSet(); - for(int i=0;i 0) { - doAfterFlush(); + assert nextDocID == numDocsInRAM; + assert waitQueue.numWaiting == 0; + assert waitQueue.waitingBytes == 0; + + if (infoStream != null) { + message("flush postings as segment " + segment + " numDocs=" + numDocsInRAM); + } + + final Collection threads = new HashSet(); + for(int i=0;i DW -> BD + pushDeletes(newSegment, segmentInfos); - Collection getFlushedFiles() { - return flushState.flushedFiles; - } + docStoreOffset = numDocsInStore; - /** Build compound file for the segment we just flushed */ - void createCompoundFile(String segment) throws IOException { - - CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, - IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION)); - for(String fileName : flushState.flushedFiles) { - cfsWriter.addFile(fileName); - } - - // Perform the merge - cfsWriter.close(); - } - - /** Set flushPending if it is not already set and returns - * whether it was set. This is used by IndexWriter to - * trigger a single flush even when multiple threads are - * trying to do so. */ - synchronized boolean setFlushPending() { - if (flushPending) - return false; - else { - flushPending = true; - return true; - } - } - - synchronized void clearFlushPending() { - bufferIsFull = false; - flushPending = false; - } - - synchronized void pushDeletes() { - deletesFlushed.update(deletesInRAM); + return newSegment; } synchronized void close() { @@ -746,6 +834,7 @@ synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException { final Thread currentThread = Thread.currentThread(); + assert !Thread.holdsLock(writer); // First, find a thread state. If this thread already // has affinity to a specific ThreadState, use that one @@ -776,73 +865,35 @@ } // Next, wait until my thread state is idle (in case - // it's shared with other threads) and for threads to - // not be paused nor a flush pending: + // it's shared with other threads), and no flush/abort + // pending waitReady(state); // Allocate segment name if this is the first doc since // last flush: initSegmentName(false); - state.isIdle = false; + state.docState.docID = nextDocID++; - boolean success = false; - try { - state.docState.docID = nextDocID; - - assert writer.testPoint("DocumentsWriter.ThreadState.init start"); - - if (delTerm != null) { - addDeleteTerm(delTerm, state.docState.docID); - state.doFlushAfter = timeToFlushDeletes(); - } - - assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm"); - - nextDocID++; - numDocsInRAM++; - - // We must at this point commit to flushing to ensure we - // always get N docs when we flush by doc count, even if - // > 1 thread is adding documents: - if (!flushPending && - maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH - && numDocsInRAM >= maxBufferedDocs) { - flushPending = true; - state.doFlushAfter = true; - } - - success = true; - } finally { - if (!success) { - // Forcefully idle this ThreadState: - state.isIdle = true; - notifyAll(); - if (state.doFlushAfter) { - state.doFlushAfter = false; - flushPending = false; - } - } + if (delTerm != null) { + pendingDeletes.addTerm(delTerm, state.docState.docID); } + numDocsInRAM++; + state.isIdle = false; return state; } - - /** Returns true if the caller (IndexWriter) should now - * flush. */ - boolean addDocument(Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { + + boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { return updateDocument(doc, analyzer, null); } - - boolean updateDocument(Term t, Document doc, Analyzer analyzer) - throws CorruptIndexException, IOException { - return updateDocument(doc, analyzer, t); - } - + boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws CorruptIndexException, IOException { - + + // Possibly trigger a flush, or wait until any running flush completes: + boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0); + // This call is synchronized but fast final DocumentsWriterThreadState state = getThreadState(doc, delTerm); @@ -867,11 +918,23 @@ success = true; } finally { if (!success) { + + // If this thread state had decided to flush, we + // must clear it so another thread can flush + if (doFlush) { + flushControl.clearFlushPending(); + } + + if (infoStream != null) { + message("exception in updateDocument aborting=" + aborting); + } + synchronized(this) { + state.isIdle = true; + notifyAll(); + if (aborting) { - state.isIdle = true; - notifyAll(); abort(); } else { skipDocWriter.docID = docState.docID; @@ -881,61 +944,38 @@ success2 = true; } finally { if (!success2) { - state.isIdle = true; - notifyAll(); abort(); return false; } } - state.isIdle = true; - notifyAll(); - - // If this thread state had decided to flush, we - // must clear it so another thread can flush - if (state.doFlushAfter) { - state.doFlushAfter = false; - flushPending = false; - notifyAll(); - } - // Immediately mark this document as deleted // since likely it was partially added. This // keeps indexing as "all or none" (atomic) when // adding a document: - addDeleteDocID(state.docState.docID); + deleteDocID(state.docState.docID); } } } } - return state.doFlushAfter || timeToFlushDeletes(); + doFlush |= flushControl.flushByRAMUsage("new document"); + + return doFlush; } - // for testing - synchronized int getNumBufferedDeleteTerms() { - return deletesInRAM.numTerms; + public synchronized void waitIdle() { + while (!allThreadsIdle()) { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } } - // for testing - synchronized Map getBufferedDeleteTerms() { - return deletesInRAM.terms; - } - - /** Called whenever a merge has completed and the merged segments had deletions */ - synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) { - if (docMaps == null) - // The merged segments had no deletes so docIDs did not change and we have nothing to do - return; - MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); - deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); - deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); - flushedDocCount -= mapper.docShift; - } - - synchronized private void waitReady(DocumentsWriterThreadState state) { - - while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) { + synchronized void waitReady(DocumentsWriterThreadState state) { + while (!closed && (!state.isIdle || aborting)) { try { wait(); } catch (InterruptedException ie) { @@ -943,261 +983,9 @@ } } - if (closed) + if (closed) { throw new AlreadyClosedException("this IndexWriter is closed"); - } - - boolean bufferDeleteTerms(Term[] terms) throws IOException { - synchronized(this) { - waitReady(null); - for (int i = 0; i < terms.length; i++) - addDeleteTerm(terms[i], numDocsInRAM); } - return timeToFlushDeletes(); - } - - boolean bufferDeleteTerm(Term term) throws IOException { - synchronized(this) { - waitReady(null); - addDeleteTerm(term, numDocsInRAM); - } - return timeToFlushDeletes(); - } - - boolean bufferDeleteQueries(Query[] queries) throws IOException { - synchronized(this) { - waitReady(null); - for (int i = 0; i < queries.length; i++) - addDeleteQuery(queries[i], numDocsInRAM); - } - return timeToFlushDeletes(); - } - - boolean bufferDeleteQuery(Query query) throws IOException { - synchronized(this) { - waitReady(null); - addDeleteQuery(query, numDocsInRAM); - } - return timeToFlushDeletes(); - } - - synchronized boolean deletesFull() { - return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && - (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + bytesUsed()) >= ramBufferSize) || - (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && - ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); - } - - synchronized boolean doApplyDeletes() { - // Very similar to deletesFull(), except we don't count - // numBytesUsed, because we are checking whether - // deletes (alone) are consuming too many resources now - // and thus should be applied. We apply deletes if RAM - // usage is > 1/2 of our allowed RAM buffer, to prevent - // too-frequent flushing of a long tail of tiny segments - // when merges (which always apply deletes) are - // infrequent. - return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && - (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize/2) || - (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && - ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); - } - - private boolean timeToFlushDeletes() { - balanceRAM(); - synchronized(this) { - return (bufferIsFull || deletesFull()) && setFlushPending(); - } - } - - void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { - this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; - } - - int getMaxBufferedDeleteTerms() { - return maxBufferedDeleteTerms; - } - - synchronized boolean hasDeletes() { - return deletesFlushed.any(); - } - - synchronized boolean applyDeletes(SegmentInfos infos) throws IOException { - - if (!hasDeletes()) - return false; - - final long t0 = System.currentTimeMillis(); - - if (infoStream != null) - message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + - deletesFlushed.docIDs.size() + " deleted docIDs and " + - deletesFlushed.queries.size() + " deleted queries on " + - + infos.size() + " segments."); - - final int infosEnd = infos.size(); - - int docStart = 0; - boolean any = false; - for (int i = 0; i < infosEnd; i++) { - - // Make sure we never attempt to apply deletes to - // segment in external dir - assert infos.info(i).dir == directory; - - SegmentReader reader = writer.readerPool.get(infos.info(i), false); - try { - any |= applyDeletes(reader, docStart); - docStart += reader.maxDoc(); - } finally { - writer.readerPool.release(reader); - } - } - - deletesFlushed.clear(); - if (infoStream != null) { - message("apply deletes took " + (System.currentTimeMillis()-t0) + " msec"); - } - - return any; - } - - // used only by assert - private Term lastDeleteTerm; - - // used only by assert - private boolean checkDeleteTerm(Term term) { - if (term != null) { - assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; - } - lastDeleteTerm = term; - return true; - } - - // Apply buffered delete terms, queries and docIDs to the - // provided reader - private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) - throws CorruptIndexException, IOException { - - final int docEnd = docIDStart + reader.maxDoc(); - boolean any = false; - - assert checkDeleteTerm(null); - - // Delete by term - if (deletesFlushed.terms.size() > 0) { - Fields fields = reader.fields(); - if (fields == null) { - // This reader has no postings - return false; - } - - TermsEnum termsEnum = null; - - String currentField = null; - DocsEnum docs = null; - - for (Entry entry: deletesFlushed.terms.entrySet()) { - Term term = entry.getKey(); - // Since we visit terms sorted, we gain performance - // by re-using the same TermsEnum and seeking only - // forwards - if (term.field() != currentField) { - assert currentField == null || currentField.compareTo(term.field()) < 0; - currentField = term.field(); - Terms terms = fields.terms(currentField); - if (terms != null) { - termsEnum = terms.iterator(); - } else { - termsEnum = null; - } - } - - if (termsEnum == null) { - continue; - } - assert checkDeleteTerm(term); - - if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) { - DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs); - - if (docsEnum != null) { - docs = docsEnum; - int limit = entry.getValue().getNum(); - while (true) { - final int docID = docs.nextDoc(); - if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) { - break; - } - reader.deleteDocument(docID); - any = true; - } - } - } - } - } - - // Delete by docID - for (Integer docIdInt : deletesFlushed.docIDs) { - int docID = docIdInt.intValue(); - if (docID >= docIDStart && docID < docEnd) { - reader.deleteDocument(docID-docIDStart); - any = true; - } - } - - // Delete by query - if (deletesFlushed.queries.size() > 0) { - IndexSearcher searcher = new IndexSearcher(reader); - try { - for (Entry entry : deletesFlushed.queries.entrySet()) { - Query query = entry.getKey(); - int limit = entry.getValue().intValue(); - Weight weight = query.weight(searcher); - Scorer scorer = weight.scorer(reader, true, false); - if (scorer != null) { - while(true) { - int doc = scorer.nextDoc(); - if (((long) docIDStart) + doc >= limit) - break; - reader.deleteDocument(doc); - any = true; - } - } - } - } finally { - searcher.close(); - } - } - return any; - } - - // Buffer a term in bufferedDeleteTerms, which records the - // current number of documents buffered in ram so that the - // delete term will be applied to those documents as well - // as the disk segments. - synchronized private void addDeleteTerm(Term term, int docCount) { - BufferedDeletes.Num num = deletesInRAM.terms.get(term); - final int docIDUpto = flushedDocCount + docCount; - if (num == null) - deletesInRAM.terms.put(term, new BufferedDeletes.Num(docIDUpto)); - else - num.setNum(docIDUpto); - deletesInRAM.numTerms++; - - deletesInRAM.addBytesUsed(BYTES_PER_DEL_TERM + term.bytes.length); - } - - // Buffer a specific docID for deletion. Currently only - // used when we hit a exception when adding a document - synchronized private void addDeleteDocID(int docID) { - deletesInRAM.docIDs.add(Integer.valueOf(flushedDocCount+docID)); - deletesInRAM.addBytesUsed(BYTES_PER_DEL_DOCID); - } - - synchronized private void addDeleteQuery(Query query, int docID) { - deletesInRAM.queries.put(query, Integer.valueOf(flushedDocCount + docID)); - deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY); } /** Does the synchronized work to finish/flush the @@ -1218,14 +1006,18 @@ // waiting for me to become idle. We just forcefully // idle this threadState; it will be fully reset by // abort() - if (docWriter != null) + if (docWriter != null) { try { docWriter.abort(); } catch (Throwable t) { } + } perThread.isIdle = true; + + // wakes up any threads waiting on the wait queue notifyAll(); + return; } @@ -1241,12 +1033,9 @@ if (doPause) waitForWaitQueue(); - if (bufferIsFull && !flushPending) { - flushPending = true; - perThread.doFlushAfter = true; - } + perThread.isIdle = true; - perThread.isIdle = true; + // wakes up any threads waiting on the wait queue notifyAll(); } } @@ -1275,42 +1064,8 @@ } final SkipDocWriter skipDocWriter = new SkipDocWriter(); - long getRAMUsed() { - return bytesUsed() + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; - } - NumberFormat nf = NumberFormat.getInstance(); - // Coarse estimates used to measure RAM usage of buffered deletes - final static int OBJECT_HEADER_BYTES = 8; - final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4; - final static int INT_NUM_BYTE = 4; - final static int CHAR_NUM_BYTE = 2; - - /* Rough logic: HashMap has an array[Entry] w/ varying - load factor (say 2 * POINTER). Entry is object w/ Term - key, BufferedDeletes.Num val, int hash, Entry next - (OBJ_HEADER + 3*POINTER + INT). Term is object w/ - String field and String text (OBJ_HEADER + 2*POINTER). - We don't count Term's field since it's interned. - Term's text is String (OBJ_HEADER + 4*INT + POINTER + - OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is - OBJ_HEADER + INT. */ - - final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE; - - /* Rough logic: del docIDs are List. Say list - allocates ~2X size (2*POINTER). Integer is OBJ_HEADER - + int */ - final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE; - - /* Rough logic: HashMap has an array[Entry] w/ varying - load factor (say 2 * POINTER). Entry is object w/ - Query key, Integer val, int hash, Entry next - (OBJ_HEADER + 3*POINTER + INT). Query we often - undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ - final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24; - /* Initial chunks size of the shared byte[] blocks used to store postings data */ final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; @@ -1333,14 +1088,14 @@ final int[] b; if (0 == size) { b = new int[INT_BLOCK_SIZE]; - bytesUsed.addAndGet(INT_BLOCK_SIZE*INT_NUM_BYTE); + bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT); } else b = freeIntBlocks.remove(size-1); return b; } - private long bytesUsed() { - return bytesUsed.get(); + long bytesUsed() { + return bytesUsed.get() + pendingDeletes.bytesUsed.get(); } /* Return int[]s to the pool */ @@ -1376,19 +1131,20 @@ final boolean doBalance; final long deletesRAMUsed; + deletesRAMUsed = bufferedDeletes.bytesUsed(); + synchronized(this) { if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) { return; } - deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed; - doBalance = bytesUsed() +deletesRAMUsed >= ramBufferSize; + doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize; } if (doBalance) { if (infoStream != null) - message(" RAM: now balance allocations: usedMB=" + toMB(bytesUsed()) + + message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) + " vs trigger=" + toMB(ramBufferSize) + " deletesMB=" + toMB(deletesRAMUsed) + " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) + @@ -1414,7 +1170,7 @@ bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize; if (infoStream != null) { if (bytesUsed()+deletesRAMUsed > ramBufferSize) - message(" nothing to free; now set bufferIsFull"); + message(" nothing to free; set bufferIsFull"); else message(" nothing to free"); } @@ -1426,7 +1182,7 @@ } if ((1 == iter % 4) && freeIntBlocks.size() > 0) { freeIntBlocks.remove(freeIntBlocks.size()-1); - bytesUsed.addAndGet(-INT_BLOCK_SIZE * INT_NUM_BYTE); + bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT); } if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) { perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K) @@ -1501,8 +1257,9 @@ nextWriteLoc = 0; success = true; } finally { - if (!success) + if (!success) { setAborting(); + } } } @@ -1519,8 +1276,9 @@ waiting[nextWriteLoc] = null; waitingBytes -= doc.sizeInBytes(); writeDocument(doc); - } else + } else { break; + } } } else { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java --- lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java Thu Dec 09 05:56:34 2010 -0500 @@ -27,7 +27,6 @@ boolean isIdle = true; // false if this is currently in use by a thread int numThreads = 1; // Number of threads that share this instance - boolean doFlushAfter; // true if we should flush after processing current doc final DocConsumerPerThread consumer; final DocumentsWriter.DocState docState; @@ -45,6 +44,5 @@ void doAfterFlush() { numThreads = 0; - doFlushAfter = false; } } Index: lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java --- lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Thu Dec 09 05:56:34 2010 -0500 @@ -21,6 +21,7 @@ import org.apache.lucene.analysis.tokenattributes.PayloadAttribute; import org.apache.lucene.document.Fieldable; +import org.apache.lucene.util.RamUsageEstimator; // TODO: break into separate freq and prox writers as // codecs; make separate container (tii/tis/skip/*) that can @@ -88,7 +89,7 @@ } } - final void writeProx(final int termID, int proxCode) { + void writeProx(final int termID, int proxCode) { final Payload payload; if (payloadAttribute == null) { payload = null; @@ -110,7 +111,7 @@ } @Override - final void newTerm(final int termID) { + void newTerm(final int termID) { // First time we're seeing this term since the last // flush assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start"); @@ -127,7 +128,7 @@ } @Override - final void addTerm(final int termID) { + void addTerm(final int termID) { assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start"); @@ -205,7 +206,7 @@ @Override int bytesPerPosting() { - return ParallelPostingsArray.BYTES_PER_POSTING + 4 * DocumentsWriter.INT_NUM_BYTE; + return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT; } } Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java --- lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu Dec 09 05:56:34 2010 -0500 @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.Closeable; import java.io.PrintStream; +import java.util.concurrent.atomic.AtomicInteger; import java.util.List; import java.util.Collection; import java.util.ArrayList; @@ -201,9 +202,8 @@ private final static int MERGE_READ_BUFFER_SIZE = 4096; // Used for printing messages - private static Object MESSAGE_ID_LOCK = new Object(); - private static int MESSAGE_ID = 0; - private int messageID = -1; + private static final AtomicInteger MESSAGE_ID = new AtomicInteger(); + private int messageID = MESSAGE_ID.getAndIncrement(); volatile private boolean hitOOM; private final Directory directory; // where this index resides @@ -218,7 +218,7 @@ volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) volatile long pendingCommitChangeCount; - private final SegmentInfos segmentInfos; // the segments + final SegmentInfos segmentInfos; // the segments private DocumentsWriter docWriter; private IndexFileDeleter deleter; @@ -245,10 +245,11 @@ private long mergeGen; private boolean stopMerges; - private int flushCount; - private int flushDeletesCount; + private final AtomicInteger flushCount = new AtomicInteger(); + private final AtomicInteger flushDeletesCount = new AtomicInteger(); final ReaderPool readerPool = new ReaderPool(); + final BufferedDeletes bufferedDeletes; // This is a "write once" variable (like the organic dye // on a DVD-R that may or may not be heated by a laser and @@ -402,20 +403,26 @@ /** * Release the segment reader (i.e. decRef it and close if there * are no more references. + * @return true if this release altered the index (eg + * the SegmentReader had pending changes to del docs and + * was closed). Caller must call checkpoint() if so. * @param sr * @throws IOException */ - public synchronized void release(SegmentReader sr) throws IOException { - release(sr, false); + public synchronized boolean release(SegmentReader sr) throws IOException { + return release(sr, false); } /** * Release the segment reader (i.e. decRef it and close if there * are no more references. + * @return true if this release altered the index (eg + * the SegmentReader had pending changes to del docs and + * was closed). Caller must call checkpoint() if so. * @param sr * @throws IOException */ - public synchronized void release(SegmentReader sr, boolean drop) throws IOException { + public synchronized boolean release(SegmentReader sr, boolean drop) throws IOException { final boolean pooled = readerMap.containsKey(sr.getSegmentInfo()); @@ -446,13 +453,10 @@ // not pooling readers, we release it: readerMap.remove(sr.getSegmentInfo()); - if (hasChanges) { - // Must checkpoint w/ deleter, because this - // segment reader will have created new _X_N.del - // file. - deleter.checkpoint(segmentInfos, false); - } + return hasChanges; } + + return false; } /** Remove all our references to readers, and commits @@ -600,6 +604,8 @@ } } + + /** * Obtain the number of deleted docs for a pooled reader. * If the reader isn't being pooled, the segmentInfo's @@ -646,15 +652,6 @@ infoStream.println("IW " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message); } - private synchronized void setMessageID(PrintStream infoStream) { - if (infoStream != null && messageID == -1) { - synchronized(MESSAGE_ID_LOCK) { - messageID = MESSAGE_ID++; - } - } - this.infoStream = infoStream; - } - CodecProvider codecs; /** @@ -690,7 +687,7 @@ config = (IndexWriterConfig) conf.clone(); directory = d; analyzer = conf.getAnalyzer(); - setMessageID(defaultInfoStream); + infoStream = defaultInfoStream; maxFieldLength = conf.getMaxFieldLength(); termIndexInterval = conf.getTermIndexInterval(); mergePolicy = conf.getMergePolicy(); @@ -699,6 +696,8 @@ mergedSegmentWarmer = conf.getMergedSegmentWarmer(); codecs = conf.getCodecProvider(); + bufferedDeletes = new BufferedDeletes(messageID); + bufferedDeletes.setInfoStream(infoStream); poolReaders = conf.getReaderPooling(); OpenMode mode = conf.getOpenMode(); @@ -766,7 +765,7 @@ setRollbackSegmentInfos(segmentInfos); - docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates(), getCurrentFieldInfos()); + docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates(), getCurrentFieldInfos(), bufferedDeletes); docWriter.setInfoStream(infoStream); docWriter.setMaxFieldLength(maxFieldLength); @@ -785,7 +784,6 @@ segmentInfos.changed(); } - docWriter.setMaxBufferedDeleteTerms(conf.getMaxBufferedDeleteTerms()); docWriter.setRAMBufferSizeMB(conf.getRAMBufferSizeMB()); docWriter.setMaxBufferedDocs(conf.getMaxBufferedDocs()); pushMaxBufferedDocs(); @@ -896,9 +894,10 @@ */ public void setInfoStream(PrintStream infoStream) { ensureOpen(); - setMessageID(infoStream); + this.infoStream = infoStream; docWriter.setInfoStream(infoStream); deleter.setInfoStream(infoStream); + bufferedDeletes.setInfoStream(infoStream); if (infoStream != null) messageState(); } @@ -1029,8 +1028,6 @@ private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException { - docWriter.pauseAllThreads(); - try { if (infoStream != null) message("now flush at close"); @@ -1085,8 +1082,6 @@ closing = false; notifyAll(); if (!closed) { - if (docWriter != null) - docWriter.resumeAllThreads(); if (infoStream != null) message("hit exception while closing"); } @@ -1094,87 +1089,6 @@ } } - /** Tells the docWriter to close its currently open shared - * doc stores (stored fields & vectors files). - * Return value specifices whether new doc store files are compound or not. - */ - private synchronized boolean flushDocStores() throws IOException { - - if (infoStream != null) { - message("flushDocStores segment=" + docWriter.getDocStoreSegment()); - } - - boolean useCompoundDocStore = false; - if (infoStream != null) { - message("closeDocStores segment=" + docWriter.getDocStoreSegment()); - } - - String docStoreSegment; - - boolean success = false; - try { - docStoreSegment = docWriter.closeDocStore(); - success = true; - } finally { - if (!success && infoStream != null) { - message("hit exception closing doc store segment"); - } - } - - if (infoStream != null) { - message("flushDocStores files=" + docWriter.closedFiles()); - } - - useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos); - - if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { - // Now build compound doc store file - - if (infoStream != null) { - message("create compound file " + IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION)); - } - - success = false; - - final int numSegments = segmentInfos.size(); - final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); - - try { - CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); - for (final String file : docWriter.closedFiles() ) { - cfsWriter.addFile(file); - } - - // Perform the merge - cfsWriter.close(); - success = true; - - } finally { - if (!success) { - if (infoStream != null) - message("hit exception building compound file doc store for segment " + docStoreSegment); - deleter.deleteFile(compoundFileName); - docWriter.abort(); - } - } - - for(int i=0;i files = docWriter.abortedFiles(); - if (files != null) - deleter.deleteNewFiles(files); + if (docWriter != null) { + final Collection files = docWriter.abortedFiles(); + if (files != null) { + deleter.deleteNewFiles(files); + } + } } } } - if (doFlush) + if (doFlush) { flush(true, false, false); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocument"); } @@ -1522,13 +1454,13 @@ } // for test purpose - final synchronized int getFlushCount() { - return flushCount; + final int getFlushCount() { + return flushCount.get(); } // for test purpose - final synchronized int getFlushDeletesCount() { - return flushDeletesCount; + final int getFlushDeletesCount() { + return flushDeletesCount.get(); } final String newSegmentName() { @@ -1660,8 +1592,10 @@ if (maxNumSegments < 1) throw new IllegalArgumentException("maxNumSegments must be >= 1; got " + maxNumSegments); - if (infoStream != null) + if (infoStream != null) { message("optimize: index now " + segString()); + message("now flush at optimize"); + } flush(true, false, true); @@ -1944,8 +1878,6 @@ message("rollback"); } - docWriter.pauseAllThreads(); - try { finishMerges(false); @@ -1955,6 +1887,8 @@ mergePolicy.close(); mergeScheduler.close(); + bufferedDeletes.clear(); + synchronized(this) { if (pendingCommit != null) { @@ -1993,7 +1927,6 @@ } finally { synchronized(this) { if (!success) { - docWriter.resumeAllThreads(); closing = false; notifyAll(); if (infoStream != null) @@ -2021,7 +1954,6 @@ * will receive {@link MergePolicy.MergeAbortedException}s. */ public synchronized void deleteAll() throws IOException { - docWriter.pauseAllThreads(); try { // Abort any running merges @@ -2029,7 +1961,6 @@ // Remove any buffered docs docWriter.abort(); - docWriter.setFlushedDocCount(0); // Remove all segments segmentInfos.clear(); @@ -2047,7 +1978,6 @@ } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteAll"); } finally { - docWriter.resumeAllThreads(); if (infoStream != null) { message("hit exception during deleteAll"); } @@ -2123,7 +2053,7 @@ * the index files referenced exist (correctly) in the * index directory. */ - private synchronized void checkpoint() throws IOException { + synchronized void checkpoint() throws IOException { changeCount++; segmentInfos.changed(); deleter.checkpoint(segmentInfos, false); @@ -2259,9 +2189,6 @@ synchronized (this) { ensureOpen(); segmentInfos.addAll(infos); - // Notify DocumentsWriter that the flushed count just increased - docWriter.updateFlushedDocCount(docCount); - checkpoint(); } @@ -2324,10 +2251,6 @@ // Register the new segment synchronized(this) { segmentInfos.add(info); - - // Notify DocumentsWriter that the flushed count just increased - docWriter.updateFlushedDocCount(docCount); - checkpoint(); } } catch (OutOfMemoryError oom) { @@ -2535,196 +2458,92 @@ // We can be called during close, when closing==true, so we must pass false to ensureOpen: ensureOpen(false); - if (doFlush(flushDocStores, flushDeletes) && triggerMerge) + if (doFlush(flushDocStores, flushDeletes) && triggerMerge) { maybeMerge(); + } } // TODO: this method should not have to be entirely // synchronized, ie, merges should be allowed to commit // even while a flush is happening - private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - try { - try { - return doFlushInternal(flushDocStores, flushDeletes); - } finally { - docWriter.balanceRAM(); - } - } finally { - docWriter.clearFlushPending(); - } - } - - // TODO: this method should not have to be entirely - // synchronized, ie, merges should be allowed to commit - // even while a flush is happening - private synchronized final boolean doFlushInternal(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { + private synchronized final boolean doFlush(boolean closeDocStores, boolean applyAllDeletes) throws CorruptIndexException, IOException { if (hitOOM) { throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush"); } - ensureOpen(false); + doBeforeFlush(); assert testPoint("startDoFlush"); - doBeforeFlush(); - - flushCount++; - - // If we are flushing because too many deletes - // accumulated, then we should apply the deletes to free - // RAM: - flushDeletes |= docWriter.doApplyDeletes(); - - // Make sure no threads are actively adding a document. - // Returns true if docWriter is currently aborting, in - // which case we skip flushing this segment - if (infoStream != null) { - message("flush: now pause all indexing threads"); - } - if (docWriter.pauseAllThreads()) { - docWriter.resumeAllThreads(); - return false; - } + // We may be flushing because it was triggered by doc + // count, del count, ram usage (in which case flush + // pending is already set), or we may be flushing + // due to external event eg getReader or commit is + // called (in which case we now set it, and this will + // pause all threads): + flushControl.setFlushPendingNoWait("explicit flush"); + + boolean success = false; try { - SegmentInfo newSegment = null; - - final int numDocs = docWriter.getNumDocsInRAM(); - - // Always flush docs if there are any - boolean flushDocs = numDocs > 0; - - String docStoreSegment = docWriter.getDocStoreSegment(); - - assert docStoreSegment != null || numDocs == 0: "dss=" + docStoreSegment + " numDocs=" + numDocs; - - if (docStoreSegment == null) - flushDocStores = false; - - int docStoreOffset = docWriter.getDocStoreOffset(); - - boolean docStoreIsCompoundFile = false; - if (infoStream != null) { - message(" flush: segment=" + docWriter.getSegment() + - " docStoreSegment=" + docWriter.getDocStoreSegment() + - " docStoreOffset=" + docStoreOffset + - " flushDocs=" + flushDocs + - " flushDeletes=" + flushDeletes + - " flushDocStores=" + flushDocStores + - " numDocs=" + numDocs + - " numBufDelTerms=" + docWriter.getNumBufferedDeleteTerms()); + message(" start flush: applyAllDeletes=" + applyAllDeletes + " closeDocStores=" + closeDocStores); message(" index before flush " + segString()); } - - // Check if the doc stores must be separately flushed - // because other segments, besides the one we are about - // to flush, reference it - if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) { - // We must separately flush the doc store - if (infoStream != null) - message(" flush shared docStore segment " + docStoreSegment); - - docStoreIsCompoundFile = flushDocStores(); - flushDocStores = false; - } - - String segment = docWriter.getSegment(); - - // If we are flushing docs, segment must not be null: - assert segment != null || !flushDocs; - - if (flushDocs) { - - boolean success = false; - final int flushedDocCount; - - try { - flushedDocCount = docWriter.flush(flushDocStores); - if (infoStream != null) { - message("flushedFiles=" + docWriter.getFlushedFiles()); - } - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception flushing segment " + segment); - deleter.refresh(segment); - } - } - - if (0 == docStoreOffset && flushDocStores) { - // This means we are flushing private doc stores - // with this segment, so it will not be shared - // with other segments - assert docStoreSegment != null; - assert docStoreSegment.equals(segment); - docStoreOffset = -1; - docStoreIsCompoundFile = false; - docStoreSegment = null; - } - - // Create new SegmentInfo, but do not add to our - // segmentInfos until deletes are flushed - // successfully. - newSegment = new SegmentInfo(segment, - flushedDocCount, - directory, false, docStoreOffset, - docStoreSegment, docStoreIsCompoundFile, - docWriter.hasProx(), - docWriter.getSegmentCodecs()); - - if (infoStream != null) { - message("flush codec=" + docWriter.getSegmentCodecs()); - } + + final SegmentInfo newSegment = docWriter.flush(this, closeDocStores, deleter, mergePolicy, segmentInfos); + if (newSegment != null) { setDiagnostics(newSegment, "flush"); - } - - docWriter.pushDeletes(); - - if (flushDocs) { segmentInfos.add(newSegment); checkpoint(); } - if (flushDocs && mergePolicy.useCompoundFile(segmentInfos, newSegment)) { - // Now build compound file - boolean success = false; - try { - docWriter.createCompoundFile(segment); - success = true; - } finally { - if (!success) { - if (infoStream != null) - message("hit exception creating compound file for newly flushed segment " + segment); - deleter.deleteFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION)); + if (!applyAllDeletes) { + // If deletes alone are consuming > 1/2 our RAM + // buffer, force them all to apply now. This is to + // prevent too-frequent flushing of a long tail of + // tiny segments: + if (flushControl.getFlushDeletes() || + (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH && + bufferedDeletes.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) { + applyAllDeletes = true; + if (infoStream != null) { + message("force apply deletes bytesUsed=" + bufferedDeletes.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB())); } } - - newSegment.setUseCompoundFile(true); - checkpoint(); } - if (flushDeletes) { - applyDeletes(); + if (applyAllDeletes) { + if (infoStream != null) { + message("apply all deletes during flush"); + } + flushDeletesCount.incrementAndGet(); + if (bufferedDeletes.applyDeletes(readerPool, segmentInfos, segmentInfos)) { + checkpoint(); + } + flushControl.clearDeletes(); + } else if (infoStream != null) { + message("don't apply deletes now delTermCount=" + bufferedDeletes.numTerms() + " bytesUsed=" + bufferedDeletes.bytesUsed()); } - - if (flushDocs) - checkpoint(); doAfterFlush(); - - return flushDocs; + flushCount.incrementAndGet(); + + success = true; + + return newSegment != null; } catch (OutOfMemoryError oom) { handleOOM(oom, "doFlush"); // never hit return false; } finally { - docWriter.clearFlushPending(); - docWriter.resumeAllThreads(); + flushControl.clearFlushPending(); + if (!success && infoStream != null) { + message("hit exception during flush"); + } } } @@ -2733,7 +2552,7 @@ */ public final long ramSizeInBytes() { ensureOpen(); - return docWriter.getRAMUsed(); + return docWriter.bytesUsed() + bufferedDeletes.bytesUsed(); } /** Expert: Return the number of documents currently @@ -2776,7 +2595,7 @@ * saves the resulting deletes file (incrementing the * delete generation for merge.info). If no deletes were * flushed, no new deletes file is saved. */ - synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergeReader) throws IOException { + synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge, SegmentReader mergedReader) throws IOException { assert testPoint("startCommitMergeDeletes"); @@ -2815,7 +2634,7 @@ assert currentDelDocs.get(j); else { if (currentDelDocs.get(j)) { - mergeReader.doDelete(docUpto); + mergedReader.doDelete(docUpto); delCount++; } docUpto++; @@ -2829,7 +2648,7 @@ // does: for(int j=0; j 0; + assert mergedReader.numDeletedDocs() == delCount; + + mergedReader.hasChanges = delCount > 0; } /* FIXME if we want to support non-contiguous segment merges */ - synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, int mergedDocCount, SegmentReader mergedReader) throws IOException { + synchronized private boolean commitMerge(MergePolicy.OneMerge merge, SegmentMerger merger, SegmentReader mergedReader) throws IOException { assert testPoint("startCommitMerge"); @@ -2873,7 +2692,6 @@ final int start = ensureContiguousMerge(merge); commitMergedDeletes(merge, mergedReader); - docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); // If the doc store we are using has been closed and // is in now compound format (but wasn't when we @@ -2886,7 +2704,7 @@ segmentInfos.subList(start, start + merge.segments.size()).clear(); assert !segmentInfos.contains(merge.info); segmentInfos.add(start, merge.info); - + closeMergeReaders(merge, false); // Must note the change to segmentInfos so any commits @@ -2897,6 +2715,12 @@ // them so that they don't bother writing them to // disk, updating SegmentInfo, etc.: readerPool.clear(merge.segments); + + // remove pending deletes of the segments + // that were merged, moving them onto the segment just + // before the merged segment + // Lock order: IW -> BD + bufferedDeletes.commitMerge(merge); if (merge.optimize) { // cascade the optimize: @@ -3056,10 +2880,17 @@ final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException { boolean success = false; try { + // Lock order: IW -> BD + if (bufferedDeletes.applyDeletes(readerPool, segmentInfos, merge.segments)) { + checkpoint(); + } _mergeInit(merge); success = true; } finally { if (!success) { + if (infoStream != null) { + message("hit exception in mergeInit"); + } mergeFinish(merge); } } @@ -3082,9 +2913,7 @@ if (merge.isAborted()) return; - - applyDeletes(); - + final SegmentInfos sourceSegments = merge.segments; final int end = sourceSegments.size(); @@ -3274,10 +3103,11 @@ if (suppressExceptions) { // Suppress any new exceptions so we throw the // original cause + boolean anyChanges = false; for (int i=0;ipcp will be invoked for every segment that @@ -3894,4 +3708,123 @@ return payloadProcessorProvider; } + // decides when flushes happen + final class FlushControl { + + private boolean flushPending; + private boolean flushDeletes; + private int delCount; + private int docCount; + private boolean flushing; + + private synchronized boolean setFlushPending(String reason, boolean doWait) { + if (flushPending || flushing) { + if (doWait) { + while(flushPending || flushing) { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + } + return false; + } else { + if (infoStream != null) { + message("now trigger flush reason=" + reason); + } + flushPending = true; + return flushPending; + } + } + + public synchronized void setFlushPendingNoWait(String reason) { + setFlushPending(reason, false); + } + + public synchronized boolean getFlushPending() { + return flushPending; + } + + public synchronized boolean getFlushDeletes() { + return flushDeletes; + } + + public synchronized void clearFlushPending() { + if (infoStream != null) { + message("clearFlushPending"); + } + flushPending = false; + flushDeletes = false; + docCount = 0; + notifyAll(); + } + + public synchronized void clearDeletes() { + delCount = 0; + } + + public synchronized boolean waitUpdate(int docInc, int delInc) { + return waitUpdate(docInc, delInc, false); + } + + public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) { + while(flushPending) { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + // skipWait is only used when a thread is BOTH adding + // a doc and buffering a del term, and, the adding of + // the doc already triggered a flush + if (skipWait) { + docCount += docInc; + delCount += delInc; + return false; + } + + final int maxBufferedDocs = config.getMaxBufferedDocs(); + if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH && + (docCount+docInc) >= maxBufferedDocs) { + return setFlushPending("maxBufferedDocs", true); + } + docCount += docInc; + + final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms(); + if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && + (delCount+delInc) >= maxBufferedDeleteTerms) { + flushDeletes = true; + return setFlushPending("maxBufferedDeleteTerms", true); + } + delCount += delInc; + + return flushByRAMUsage("add delete/doc"); + } + + public synchronized boolean flushByRAMUsage(String reason) { + final double ramBufferSizeMB = config.getRAMBufferSizeMB(); + if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) { + final long limit = (long) (ramBufferSizeMB*1024*1024); + long used = bufferedDeletes.bytesUsed() + docWriter.bytesUsed(); + if (used >= limit) { + + // DocumentsWriter may be able to free up some + // RAM: + // Lock order: FC -> DW + docWriter.balanceRAM(); + + used = bufferedDeletes.bytesUsed() + docWriter.bytesUsed(); + if (used >= limit) { + return setFlushPending("ram full: " + reason, false); + } + } + } + return false; + } + } + + final FlushControl flushControl = new FlushControl(); } Index: lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java --- lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java Thu Dec 09 05:56:34 2010 -0500 @@ -1,7 +1,5 @@ package org.apache.lucene.index; -import org.apache.lucene.util.ArrayUtil; - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,9 +17,11 @@ * limitations under the License. */ +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.RamUsageEstimator; class ParallelPostingsArray { - final static int BYTES_PER_POSTING = 3 * DocumentsWriter.INT_NUM_BYTE; + final static int BYTES_PER_POSTING = 3 * RamUsageEstimator.NUM_BYTES_INT; final int size; final int[] textStarts; Index: lucene/src/java/org/apache/lucene/index/SegmentInfo.java --- lucene/src/java/org/apache/lucene/index/SegmentInfo.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/SegmentInfo.java Thu Dec 09 05:56:34 2010 -0500 @@ -361,6 +361,10 @@ return docStoreSegment; } + public void setDocStoreSegment(String segment) { + docStoreSegment = segment; + } + void setDocStoreOffset(int offset) { docStoreOffset = offset; clearFiles(); @@ -534,6 +538,12 @@ if (docStoreOffset != -1) { s.append("->").append(docStoreSegment); + if (docStoreIsCompoundFile) { + s.append('c'); + } else { + s.append('C'); + } + s.append('+').append(docStoreOffset); } return s.toString(); Index: lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java --- lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Thu Dec 09 05:56:34 2010 -0500 @@ -24,6 +24,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.ByteBlockPool; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; final class TermVectorsTermsWriterPerField extends TermsHashConsumerPerField { @@ -298,7 +299,7 @@ @Override int bytesPerPosting() { - return super.bytesPerPosting() + 3 * DocumentsWriter.INT_NUM_BYTE; + return super.bytesPerPosting() + 3 * RamUsageEstimator.NUM_BYTES_INT; } } } Index: lucene/src/test/org/apache/lucene/TestDemo.java --- lucene/src/test/org/apache/lucene/TestDemo.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/TestDemo.java Thu Dec 09 05:56:34 2010 -0500 @@ -50,6 +50,7 @@ // To store an index on disk, use this instead: //Directory directory = FSDirectory.open("/tmp/testindex"); RandomIndexWriter iwriter = new RandomIndexWriter(random, directory); + iwriter.w.setInfoStream(VERBOSE ? System.out : null); Document doc = new Document(); String longTerm = "longtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongtermlongterm"; String text = "This is the text to be indexed. " + longTerm; Index: lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java --- lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java Thu Dec 09 05:56:34 2010 -0500 @@ -84,6 +84,10 @@ lmp.setUseCompoundFile(useCompoundFiles); lmp.setUseCompoundDocStore(useCompoundFiles); IndexWriter writer = new IndexWriter(directory, conf); + if (VERBOSE) { + System.out.println("TEST: now build index"); + writer.setInfoStream(System.out); + } final int MAX_DOCS = 225; Index: lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java --- lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Thu Dec 09 05:56:34 2010 -0500 @@ -57,9 +57,9 @@ isClose = true; } } - if (isDoFlush && !isClose) { + if (isDoFlush && !isClose && random.nextBoolean()) { hitExc = true; - throw new IOException("now failing during flush"); + throw new IOException(Thread.currentThread().getName() + ": now failing during flush"); } } } @@ -73,12 +73,17 @@ directory.failOn(failure); IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setMaxBufferedDocs(2)); + writer.setInfoStream(VERBOSE ? System.out : null); Document doc = new Document(); Field idField = newField("id", "", Field.Store.YES, Field.Index.NOT_ANALYZED); doc.add(idField); int extraCount = 0; for(int i=0;i<10;i++) { + if (VERBOSE) { + System.out.println("TEST: iter=" + i); + } + for(int j=0;j<20;j++) { idField.setValue(Integer.toString(i*20+j)); writer.addDocument(doc); @@ -97,10 +102,14 @@ } extraCount++; } catch (IOException ioe) { + if (VERBOSE) { + ioe.printStackTrace(System.out); + } failure.clearDoFail(); break; } } + assertEquals(20*(i+1)+extraCount, writer.numDocs()); } writer.close(); @@ -155,8 +164,12 @@ IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()) .setMaxBufferedDocs(2)); + writer.setInfoStream(VERBOSE ? System.out : null); for(int iter=0;iter<7;iter++) { + if (VERBOSE) { + System.out.println("TEST: iter=" + iter); + } for(int j=0;j<21;j++) { Document doc = new Document(); Index: lucene/src/test/org/apache/lucene/index/TestIndexWriter.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Thu Dec 09 05:56:34 2010 -0500 @@ -1097,6 +1097,9 @@ doc.add(idField); for(int pass=0;pass<2;pass++) { + if (VERBOSE) { + System.out.println("TEST: pass=" + pass); + } IndexWriter writer = new IndexWriter( directory, @@ -1108,10 +1111,12 @@ // backed directory: setMergePolicy(newLogMergePolicy(false, 10)) ); - - //System.out.println("TEST: pass=" + pass + " cms=" + (pass >= 2)); + writer.setInfoStream(VERBOSE ? System.out : null); + for(int iter=0;iter<10;iter++) { - //System.out.println("TEST: iter=" + iter); + if (VERBOSE) { + System.out.println("TEST: iter=" + iter); + } for(int j=0;j<199;j++) { idField.setValue(Integer.toString(iter*201+j)); writer.addDocument(doc); @@ -1156,8 +1161,9 @@ } }; - if (failure.size() > 0) + if (failure.size() > 0) { throw failure.get(0); + } t1.start(); @@ -1170,6 +1176,7 @@ // Reopen writer = new IndexWriter(directory, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.APPEND)); + writer.setInfoStream(VERBOSE ? System.out : null); } writer.close(); } @@ -2591,7 +2598,7 @@ Directory dir = newDirectory(); FlushCountingIndexWriter w = new FlushCountingIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.WHITESPACE, true, false)).setRAMBufferSizeMB(0.5).setMaxBufferedDocs(-1).setMaxBufferedDeleteTerms(-1)); - //w.setInfoStream(System.out); + w.setInfoStream(VERBOSE ? System.out : null); Document doc = new Document(); doc.add(newField("field", "go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", Field.Store.NO, Field.Index.ANALYZED)); int num = 6 * RANDOM_MULTIPLIER; @@ -2599,6 +2606,9 @@ int count = 0; final boolean doIndexing = r.nextBoolean(); + if (VERBOSE) { + System.out.println("TEST: iter doIndexing=" + doIndexing); + } if (doIndexing) { // Add docs until a flush is triggered final int startFlushCount = w.flushCount; Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Thu Dec 09 05:56:34 2010 -0500 @@ -114,6 +114,9 @@ Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.WHITESPACE, false)).setMaxBufferedDeleteTerms(1)); + + writer.setInfoStream(VERBOSE ? System.out : null); + writer.addDocument(new Document()); writer.deleteDocuments(new Term("foobar", "1")); writer.deleteDocuments(new Term("foobar", "1")); writer.deleteDocuments(new Term("foobar", "1")); @@ -125,11 +128,14 @@ // test when delete terms only apply to ram segments public void testRAMDeletes() throws IOException { for(int t=0;t<2;t++) { + if (VERBOSE) { + System.out.println("TEST: t=" + t); + } Directory dir = newDirectory(); IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(MockTokenizer.WHITESPACE, false)).setMaxBufferedDocs(4) .setMaxBufferedDeleteTerms(4)); - + modifier.setInfoStream(VERBOSE ? System.out : null); int id = 0; int value = 100; @@ -439,6 +445,9 @@ // Iterate w/ ever increasing free disk space: while (!done) { + if (VERBOSE) { + System.out.println("TEST: cycle"); + } MockDirectoryWrapper dir = new MockDirectoryWrapper(random, new RAMDirectory(startDir)); dir.setPreventDoubleWrite(false); IndexWriter modifier = new IndexWriter(dir, @@ -448,6 +457,7 @@ .setMaxBufferedDeleteTerms(1000) .setMergeScheduler(new ConcurrentMergeScheduler())); ((ConcurrentMergeScheduler) modifier.getConfig().getMergeScheduler()).setSuppressExceptions(); + modifier.setInfoStream(VERBOSE ? System.out : null); // For each disk size, first try to commit against // dir that will hit random IOExceptions & disk @@ -456,6 +466,9 @@ boolean success = false; for (int x = 0; x < 2; x++) { + if (VERBOSE) { + System.out.println("TEST: x=" + x); + } double rate = 0.1; double diskRatio = ((double)diskFree) / diskUsage; Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Thu Dec 09 05:56:34 2010 -0500 @@ -51,7 +51,7 @@ IndexWriter writer; final Random r = new java.util.Random(47); - Throwable failure; + volatile Throwable failure; public IndexerThread(int i, IndexWriter writer) { setName("Indexer " + i); @@ -79,6 +79,9 @@ final long stopTime = System.currentTimeMillis() + 500; do { + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + ": TEST: IndexerThread: cycle"); + } doFail.set(this); final String id = ""+r.nextInt(50); idField.setValue(id); @@ -136,7 +139,7 @@ if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) { if (VERBOSE) { System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name); - //new Throwable().printStackTrace(System.out); + new Throwable().printStackTrace(System.out); } throw new RuntimeException(Thread.currentThread().getName() + ": intentionally failing at " + name); } @@ -145,16 +148,23 @@ } public void testRandomExceptions() throws Throwable { + if (VERBOSE) { + System.out.println("\nTEST: start testRandomExceptions"); + } MockDirectoryWrapper dir = newDirectory(); MockIndexWriter writer = new MockIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()) .setRAMBufferSizeMB(0.1).setMergeScheduler(new ConcurrentMergeScheduler())); ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions(); //writer.setMaxBufferedDocs(10); + if (VERBOSE) { + System.out.println("TEST: initial commit"); + } writer.commit(); - if (VERBOSE) + if (VERBOSE) { writer.setInfoStream(System.out); + } IndexerThread thread = new IndexerThread(0, writer); thread.run(); @@ -163,6 +173,9 @@ fail("thread " + thread.getName() + ": hit unexpected failure"); } + if (VERBOSE) { + System.out.println("TEST: commit after thread start"); + } writer.commit(); try { @@ -192,8 +205,9 @@ //writer.setMaxBufferedDocs(10); writer.commit(); - if (VERBOSE) + if (VERBOSE) { writer.setInfoStream(System.out); + } final int NUM_THREADS = 4; @@ -294,6 +308,7 @@ public void testExceptionJustBeforeFlush() throws IOException { Directory dir = newDirectory(); MockIndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setMaxBufferedDocs(2)); + w.setInfoStream(VERBOSE ? System.out : null); Document doc = new Document(); doc.add(newField("field", "a field", Field.Store.YES, Field.Index.ANALYZED)); Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java Thu Dec 09 05:56:34 2010 -0500 @@ -47,29 +47,35 @@ public void testAddDocumentOnDiskFull() throws IOException { for(int pass=0;pass<2;pass++) { - if (VERBOSE) + if (VERBOSE) { System.out.println("TEST: pass=" + pass); + } boolean doAbort = pass == 1; long diskFree = 200; while(true) { - if (VERBOSE) + if (VERBOSE) { System.out.println("TEST: cycle: diskFree=" + diskFree); + } MockDirectoryWrapper dir = new MockDirectoryWrapper(random, new RAMDirectory()); dir.setMaxSizeInBytes(diskFree); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer())); writer.setInfoStream(VERBOSE ? System.out : null); MergeScheduler ms = writer.getConfig().getMergeScheduler(); - if (ms instanceof ConcurrentMergeScheduler) + if (ms instanceof ConcurrentMergeScheduler) { // This test intentionally produces exceptions // in the threads that CMS launches; we don't // want to pollute test output with these. ((ConcurrentMergeScheduler) ms).setSuppressExceptions(); + } boolean hitError = false; try { for(int i=0;i<200;i++) { addDoc(writer); } + if (VERBOSE) { + System.out.println("TEST: done adding docs; now commit"); + } writer.commit(); } catch (IOException e) { if (VERBOSE) { @@ -81,13 +87,19 @@ if (hitError) { if (doAbort) { + if (VERBOSE) { + System.out.println("TEST: now rollback"); + } writer.rollback(); } else { try { + if (VERBOSE) { + System.out.println("TEST: now close"); + } writer.close(); } catch (IOException e) { if (VERBOSE) { - System.out.println("TEST: exception on close"); + System.out.println("TEST: exception on close; retry w/ no disk space limit"); e.printStackTrace(System.out); } dir.setMaxSizeInBytes(0); Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java --- lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java Thu Dec 09 05:56:34 2010 -0500 @@ -106,6 +106,9 @@ int NUM_THREADS = 3; for(int iter=0;iter<10;iter++) { + if (VERBOSE) { + System.out.println("\nTEST: iter=" + iter); + } MockDirectoryWrapper dir = newDirectory(); IndexWriter writer = new IndexWriter( dir, @@ -116,6 +119,7 @@ ); ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions(); dir.setMaxSizeInBytes(4*1024+20*iter); + writer.setInfoStream(VERBOSE ? System.out : null); IndexerThread[] threads = new IndexerThread[NUM_THREADS]; Index: lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java --- lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java Thu Dec 09 05:37:58 2010 -0500 +++ lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java Thu Dec 09 05:56:34 2010 -0500 @@ -96,6 +96,9 @@ int num = 3 * RANDOM_MULTIPLIER; for (int i = 0; i < num; i++) { // increase iterations for better testing + if (VERBOSE) { + System.out.println("\n\nTEST: top iter=" + i); + } sameFieldOrder=random.nextBoolean(); mergeFactor=random.nextInt(3)+2; maxBufferedDocs=random.nextInt(3)+2; @@ -108,10 +111,17 @@ int range=random.nextInt(20)+1; Directory dir1 = newDirectory(); Directory dir2 = newDirectory(); + if (VERBOSE) { + System.out.println(" nThreads=" + nThreads + " iter=" + iter + " range=" + range + " doPooling=" + doReaderPooling + " maxThreadStates=" + maxThreadStates + " sameFieldOrder=" + sameFieldOrder + " mergeFactor=" + mergeFactor); + } Map docs = indexRandom(nThreads, iter, range, dir1, maxThreadStates, doReaderPooling); - //System.out.println("TEST: index serial"); + if (VERBOSE) { + System.out.println("TEST: index serial"); + } indexSerial(random, docs, dir2); - //System.out.println("TEST: verify"); + if (VERBOSE) { + System.out.println("TEST: verify"); + } verifyEquals(dir1, dir2, "id"); dir1.close(); dir2.close(); @@ -141,6 +151,7 @@ IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE).setRAMBufferSizeMB( 0.1).setMaxBufferedDocs(maxBufferedDocs)); + w.setInfoStream(VERBOSE ? System.out : null); w.commit(); LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy(); lmp.setUseCompoundFile(false); @@ -191,10 +202,14 @@ boolean doReaderPooling) throws IOException, InterruptedException { Map docs = new HashMap(); for(int iter=0;iter<3;iter++) { + if (VERBOSE) { + System.out.println("TEST: iter=" + iter); + } IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE) .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setMaxThreadStates(maxThreadStates) .setReaderPooling(doReaderPooling)); + w.setInfoStream(VERBOSE ? System.out : null); LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy(); lmp.setUseCompoundFile(false); lmp.setUseCompoundDocStore(false); @@ -273,9 +288,33 @@ r2.close(); } + private static void printDocs(IndexReader r) throws Throwable { + IndexReader[] subs = r.getSequentialSubReaders(); + for(IndexReader sub : subs) { + Bits delDocs = sub.getDeletedDocs(); + System.out.println(" " + ((SegmentReader) sub).getSegmentInfo()); + for(int docID=0;docID clazz) throws InitializationError {