Index: lucene/src/java/org/apache/lucene/index/FlushPolicy.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FlushPolicy.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/FlushPolicy.java (working copy) @@ -32,16 +32,16 @@ * {@link IndexWriterConfig#setRAMBufferSizeMB(double)} *
  • Number of RAM resident documents - configured via * {@link IndexWriterConfig#setMaxBufferedDocs(int)}
  • - *
  • Number of buffered delete terms - configured via + *
  • Number of buffered delete terms/queries - configured via * {@link IndexWriterConfig#setMaxBufferedDeleteTerms(int)}
  • * * - * The {@link IndexWriter} uses a provided {@link FlushPolicy} to control the - * flushing process during indexing. The policy is informed for each added or + * The {@link IndexWriter} consults a provided {@link FlushPolicy} to control the + * flushing process. The policy is informed for each added or * updated document as well as for each delete term. Based on the - * {@link FlushPolicy} the information provided via {@link ThreadState} and - * {@link DocumentsWriterFlushControl} the {@link FlushPolicy} can decide if a - * {@link DocumentsWriterPerThread} needs flushing and can mark it as + * {@link FlushPolicy}, the information provided via {@link ThreadState} and + * {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a + * {@link DocumentsWriterPerThread} needs flushing and mark it as * flush-pending via * {@link DocumentsWriterFlushControl#setFlushPending(ThreadState)}. * @@ -58,6 +58,7 @@ * Called for each delete term. If this is a delete triggered due to an update * the given {@link ThreadState} is non-null. *

    + * nocommit: what does this note mean...? * Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} @@ -66,9 +67,10 @@ ThreadState state); /** - * Called for each document update on the given {@link ThreadState}s + * Called for each document update on the given {@link ThreadState}'s * {@link DocumentsWriterPerThread}. *

    + * nocommit: what does this note mean...? * Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} @@ -103,6 +105,7 @@ * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush * pending */ + // nocommit -- move to default policy? protected void markLargestWriterPending(DocumentsWriterFlushControl control, ThreadState perThreadState, final long currentBytesPerThread) { control @@ -117,7 +120,8 @@ */ protected ThreadState findLargestNonPendingWriter( DocumentsWriterFlushControl control, ThreadState perThreadState) { - long maxRamSoFar = perThreadState.perThreadBytes; + assert perThreadState.perThread.getNumDocsInRAM() > 0; + long maxRamSoFar = perThreadState.bytesUsed; // the dwpt which needs to be flushed eventually ThreadState maxRamUsingThreadState = perThreadState; assert !perThreadState.flushPending : "DWPT should have flushed"; @@ -125,7 +129,7 @@ while (activePerThreadsIterator.hasNext()) { ThreadState next = activePerThreadsIterator.next(); if (!next.flushPending) { - final long nextRam = next.perThreadBytes; + final long nextRam = next.bytesUsed; if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) { maxRamSoFar = nextRam; maxRamUsingThreadState = next; @@ -138,6 +142,8 @@ return maxRamUsingThreadState; } + // nocommit -- I thought we pause based on "too many flush + // states pending"? /** * Returns the max net memory which marks the upper watermark for the * DocumentsWriter to be healthy. If all flushing and active @@ -155,6 +161,7 @@ */ public long getMaxNetBytes() { if (!flushOnRAM()) { + // nocommit explain that returning -1 is allowed? return -1; } final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); @@ -166,6 +173,8 @@ * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise * false. */ + // nocommit who needs this? policy shouldn't have to impl + // this? our default policy should? protected boolean flushOnDocCount() { return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH; } @@ -175,6 +184,8 @@ * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise * false. */ + // nocommit who needs this? policy shouldn't have to impl + // this? our default policy should? protected boolean flushOnDeleteTerms() { return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH; } @@ -184,6 +195,8 @@ * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise * false. */ + // nocommit who needs this? policy shouldn't have to impl + // this? our default policy should? protected boolean flushOnRAM() { return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH; } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (working copy) @@ -40,7 +40,7 @@ */ public final class DocumentsWriterFlushControl { - private final long maxBytesPerDWPT; + private final long hardMaxBytesPerDWPT; private long activeBytes = 0; private long flushBytes = 0; private volatile int numPending = 0; @@ -63,11 +63,11 @@ private final DocumentsWriter documentsWriter; DocumentsWriterFlushControl(DocumentsWriter documentsWriter, - Healthiness healthiness, long maxBytesPerDWPT) { + Healthiness healthiness, long hardMaxBytesPerDWPT) { this.healthiness = healthiness; this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; - this.maxBytesPerDWPT = maxBytesPerDWPT; + this.hardMaxBytesPerDWPT = hardMaxBytesPerDWPT; this.documentsWriter = documentsWriter; } @@ -85,8 +85,8 @@ private void commitPerThreadBytes(ThreadState perThread) { final long delta = perThread.perThread.bytesUsed() - - perThread.perThreadBytes; - perThread.perThreadBytes += delta; + - perThread.bytesUsed; + perThread.bytesUsed += delta; /* * We need to differentiate here if we are pending since setFlushPending * moves the perThread memory to the flushBytes and we could be set to @@ -100,6 +100,7 @@ assert updatePeaks(delta); } + // only for asserts private boolean updatePeaks(long delta) { peakActiveBytes = Math.max(peakActiveBytes, activeBytes); peakFlushBytes = Math.max(peakFlushBytes, flushBytes); @@ -116,10 +117,9 @@ } else { flushPolicy.onInsert(this, perThread); } - if (!perThread.flushPending && perThread.perThreadBytes > maxBytesPerDWPT) { - // safety check to prevent a single DWPT exceeding its RAM limit. This - // is super - // important since we can not address more than 2048 MB per DWPT + if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) { + // Safety check to prevent a single DWPT exceeding its RAM limit. This + // is super important since we can not address more than 2048 MB per DWPT setFlushPending(perThread); if (fullFlush) { DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false); @@ -146,8 +146,8 @@ } } - public synchronized boolean allFlushesDue() { - return numFlushing == 0; + public synchronized boolean anyFlushing() { + return numFlushing != 0; } public synchronized void waitForFlush() { @@ -167,9 +167,10 @@ */ public synchronized void setFlushPending(ThreadState perThread) { assert !perThread.flushPending; + // nocommit -- what if we are flushing only due to deletes? assert perThread.perThread.getNumDocsInRAM() > 0; perThread.flushPending = true; // write access synced - final long bytes = perThread.perThreadBytes; + final long bytes = perThread.bytesUsed; flushBytes += bytes; activeBytes -= bytes; numPending++; // write access synced @@ -177,19 +178,20 @@ synchronized void doOnAbort(ThreadState state) { if (state.flushPending) { - flushBytes -= state.perThreadBytes; + flushBytes -= state.bytesUsed; } else { - activeBytes -= state.perThreadBytes; + activeBytes -= state.bytesUsed; } - // take it out of the loop this DWPT is stale + // Take it out of the loop this DWPT is stale perThreadPool.replaceForFlush(state, closed); healthiness.updateStalled(this); } synchronized DocumentsWriterPerThread tryCheckoutForFlush( ThreadState perThread, boolean setPending) { - if (fullFlush) + if (fullFlush) { return null; + } return internalTryCheckOutForFlush(perThread, setPending); } @@ -199,17 +201,17 @@ setFlushPending(perThread); } if (perThread.flushPending) { - // we are pending so all memory is already moved to flushBytes + // We are pending so all memory is already moved to flushBytes if (perThread.tryLock()) { try { if (perThread.isActive()) { assert perThread.isHeldByCurrentThread(); final DocumentsWriterPerThread dwpt; - final long bytes = perThread.perThreadBytes; // do that before + final long bytes = perThread.bytesUsed; // do that before // replace! dwpt = perThreadPool.replaceForFlush(perThread, closed); assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing"; - // record the flushing DWPT to reduce flushBytes in doAfterFlush + // Record the flushing DWPT to reduce flushBytes in doAfterFlush flushingWriters.put(dwpt, Long.valueOf(bytes)); numPending--; // write access synced numFlushing++; @@ -296,9 +298,13 @@ return numFlushing; } - public void setFlushDeletes() { - flushDeletes.set(true); + public boolean doApplyAllDeletes() { + return flushDeletes.getAndSet(false); } + + public void setApplyAllDeletes() { + flushDeletes.set(true); + } int numActiveDWPT() { return this.perThreadPool.getMaxThreadStates(); @@ -310,7 +316,7 @@ assert !fullFlush; fullFlush = true; flushingQueue = documentsWriter.deleteQueue; - // set a new delete queue - all subsequent DWPT will use this queue until + // Set a new delete queue - all subsequent DWPT will use this queue until // we do another full flush documentsWriter.deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false)); } @@ -372,9 +378,9 @@ } } finally { + fullFlush = false; flushQueue.clear(); blockedFlushes.clear(); - fullFlush = false; } } Index: lucene/src/java/org/apache/lucene/index/Healthiness.java =================================================================== --- lucene/src/java/org/apache/lucene/index/Healthiness.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/Healthiness.java (working copy) @@ -83,10 +83,10 @@ } } - private final Healthiness.Sync sync = new Sync(); + private final Sync sync = new Sync(); volatile boolean wasStalled = false; // only with asserts - boolean isStalled() { + boolean anyStalledThreads() { return !sync.isHealthy(); } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (working copy) @@ -44,7 +44,7 @@ // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; // write access guarded by DocumentsWriterFlushControl - long perThreadBytes = 0; + long bytesUsed = 0; // guarded by Reentrant lock private boolean isActive = true; @@ -65,7 +65,7 @@ isActive = false; } this.perThread = perThread; - this.perThreadBytes = 0; + this.bytesUsed = 0; this.flushPending = false; } @@ -86,7 +86,7 @@ public long getBytesUsedPerThread() { assert this.isHeldByCurrentThread(); // public for FlushPolicy - return perThreadBytes; + return bytesUsed; } /** @@ -162,9 +162,9 @@ public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc); - public abstract void clearThreadBindings(ThreadState perThread); + //public abstract void clearThreadBindings(ThreadState perThread); - public abstract void clearAllThreadBindings(); + // public abstract void clearAllThreadBindings(); /** * Returns an iterator providing access to all {@link ThreadState} Index: lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (working copy) @@ -50,12 +50,10 @@ int hashMask = 1; int totalFieldCount; - float docBoost; int fieldGen; final DocumentsWriterPerThread.DocState docState; - public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) { this.docState = docWriter.docState; this.consumer = consumer; @@ -254,7 +252,6 @@ } } - void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) { if (lo >= hi) return; Index: lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (working copy) @@ -52,7 +52,7 @@ final int numAllFields = allFields.size(); - // sort by field name + // Sort by field name CollectionUtil.quickSort(allFields); final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state); Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (working copy) @@ -23,15 +23,15 @@ /** * {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes - * queue. In contrast to other queue implementation we only maintain only the + * queue. In contrast to other queue implementation we only maintain the * tail of the queue. A delete queue is always used in a context of a set of - * DWPT and a global delete pool. Each of the DWPT and the global pool need to - * maintain their 'own' head of the queue. The difference between the DWPT and - * the global pool is that the DWPT starts maintaining a head once it has added - * its first document since for its segments private deletes only the deletes - * after that document are relevant. The global pool instead starts maintaining - * the head once this instance is created by taking the sentinel instance as its - * initial head. + * DWPTs and a global delete pool. Each of the DWPT and the global pool need to + * maintain their 'own' head of the queue (as a DeleteSlice instance per DWPT). + * The difference between the DWPT and the global pool is that the DWPT starts + * maintaining a head once it has added its first document since for its segments + * private deletes only the deletes after that document are relevant. The global + * pool instead starts maintaining the head once this instance is created by + * taking the sentinel instance as its initial head. *

    * Since each {@link DeleteSlice} maintains its own head and the list is only * single linked the garbage collector takes care of pruning the list for us. @@ -41,12 +41,12 @@ *

    * Each DWPT as well as the global delete pool maintain their private * DeleteSlice instance. In the DWPT case updating a slice is equivalent to - * atomically finishing the document. The slice update guarantees a happens - * before relationship to all other updates in the same indexing session. When a - * DWPT updates a document it + * atomically finishing the document. The slice update guarantees a "happens + * before" relationship to all other updates in the same indexing session. When a + * DWPT updates a document it: * *

      - *
    1. consumes a document finishes its processing
    2. + *
    3. consumes a document and finishes its processing
    4. *
    5. updates its private {@link DeleteSlice} either by calling * {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the * document has a delTerm)
    6. @@ -56,7 +56,7 @@ *
    * * The DWPT also doesn't apply its current documents delete term until it has - * updated its delete slice which ensures the consistency of the update. if the + * updated its delete slice which ensures the consistency of the update. If the * update fails before the DeleteSlice could have been updated the deleteTerm * will also not be added to its private deletes neither to the global deletes. * @@ -167,7 +167,7 @@ void tryApplyGlobalSlice() { if (globalBufferLock.tryLock()) { /* - * the global buffer must be locked but we don't need to upate them if + * The global buffer must be locked but we don't need to upate them if * there is an update going on right now. It is sufficient to apply the * deletes that have been added after the current in-flight global slices * tail the next time we can get the lock! @@ -175,7 +175,6 @@ try { if (updateSlice(globalSlice)) { globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT); - } } finally { globalBufferLock.unlock(); @@ -186,15 +185,15 @@ FrozenBufferedDeletes freezeGlobalBuffer(DeleteSlice callerSlice) { globalBufferLock.lock(); /* - * here we are freezing the global buffer so we need to lock it, apply all + * Here we freeze the global buffer so we need to lock it, apply all * deletes in the queue and reset the global slice to let the GC prune the * queue. */ final Node currentTail = tail; // take the current tail make this local any - // changes after this call are applied later + // Changes after this call are applied later // and not relevant here if (callerSlice != null) { - // update the callers slices so we are on the same page + // Update the callers slices so we are on the same page callerSlice.sliceTail = currentTail; } try { @@ -217,7 +216,7 @@ } boolean updateSlice(DeleteSlice slice) { - if (slice.sliceTail != tail) { // if we are the same just + if (slice.sliceTail != tail) { // If we are the same just slice.sliceTail = tail; return true; } @@ -225,7 +224,7 @@ } static class DeleteSlice { - // no need to be volatile, slices are only access by one thread! + // No need to be volatile, slices are thread captive (only accessed by one thread)! Node sliceHead; // we don't apply this one Node sliceTail; @@ -245,7 +244,7 @@ return; } /* - * when we apply a slice we take the head and get its next as our first + * When we apply a slice we take the head and get its next as our first * item to apply and continue until we applied the tail. If the head and * tail in this slice are not equal then there will be at least one more * non-null node in the slice! @@ -260,7 +259,7 @@ } void reset() { - // resetting to a 0 length slice + // Reset to a 0 length slice sliceHead = sliceTail; } @@ -322,7 +321,6 @@ void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { bufferedDeletes.addTerm((Term) item, docIDUpto); } - } private static final class QueryArrayNode extends Node { @@ -376,6 +374,5 @@ } finally { globalBufferLock.unlock(); } - } } Index: lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (working copy) @@ -21,16 +21,16 @@ /** * Default {@link FlushPolicy} implementation that flushes based on RAM - * Consumption, document count and number of buffered deletes depending on the - * IndexWriters {@link IndexWriterConfig}. This {@link FlushPolicy} will only + * used, document count and number of buffered deletes depending on the + * IndexWriter's {@link IndexWriterConfig}. This {@link FlushPolicy} will only * respect settings which are not disabled during initialization ( - * {@link #init(DocumentsWriter)}). All enabled {@link IndexWriterConfig} + * {@link #init(DocumentsWriter)}) (nocommit what does that mean?). All enabled {@link IndexWriterConfig} * settings are used to mark {@link DocumentsWriterPerThread} as flush pending - * during indexing with respect to thier live updates. + * during indexing with respect to their live updates. *

    - * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled always the + * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the * largest ram consuming {@link DocumentsWriterPerThread} will be marked as - * pending iff the global active RAM consumption is equals or higher the + * pending iff the global active RAM consumption is >= the * configured max RAM buffer. */ public class FlushByRamOrCountsPolicy extends FlushPolicy { @@ -38,10 +38,11 @@ @Override public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDeleteTerms()) { + // Flush this state by num del terms final int maxBufferedDeleteTerms = indexWriterConfig .getMaxBufferedDeleteTerms(); if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) { - control.setFlushDeletes(); + control.setApplyAllDeletes(); } } } @@ -51,12 +52,12 @@ if (flushOnDocCount() && state.perThread.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { - control.setFlushPending(state); // flush by num docs + // Flush this state by num docs + control.setFlushPending(state); } else {// flush by RAM if (flushOnRAM()) { - final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); + final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes(); - final long limit = (long) (ramBufferSizeMB * 1024.d * 1024.d); if (totalRam >= limit) { markLargestWriterPending(control, state, totalRam); } Index: lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java =================================================================== --- lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy) @@ -208,11 +208,11 @@ } if (!packet.isSegmentPrivate) { /* - * only update the coalescededDeletes if we are NOT on a segment private del packet. - * the segment private del packet must only applied to segments with the same delGen. - * Yet, if a segment is already deleted from the SI since it had no more documents remaining - * after some del packets younger than it segPrivate packet (hihger delGen) have been applied - * the segPrivate packet has not been removed. + * Only coalesce if we are NOT on a segment private del packet: the segment private del packet + * must only applied to segments with the same delGen. Yet, if a segment is already deleted + * from the SI since it had no more documents remaining after some del packets younger than + * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been + * removed. */ coalescedDeletes.update(packet); } @@ -259,7 +259,7 @@ } /* - * since we are on a segment private del packet we must not + * Since we are on a segment private del packet we must not * update the coalescedDeletes here! We can simply advance to the * next packet and seginfo. */ Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -39,10 +39,7 @@ /** * This class accepts multiple added documents and directly - * writes a single segment file. It does this more - * efficiently than creating a single segment per document - * (with DocumentWriter) and doing standard merges on those - * segments. + * writes segment files. * * Each added document is passed to the {@link DocConsumer}, * which in turn processes the document and interacts with @@ -152,8 +149,11 @@ } synchronized boolean deleteQueries(final Query... queries) throws IOException { - final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(queries); + // nocommit -- shouldn't we check for doApplyAllDeletes + // here too? + // nocommit shouldn't this consult flush policy? or + // should this return void now? return false; } @@ -165,9 +165,11 @@ final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(terms); flushControl.doOnDelete(); - if (flushControl.flushDeletes.getAndSet(false)) { - flushDeletes(deleteQueue); + if (flushControl.doApplyAllDeletes()) { + applyAllDeletes(deleteQueue); } + // nocommit shouldn't this consult flush policy? or + // should this return void now? return false; } @@ -182,13 +184,13 @@ return deleteQueue; } - private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { if (deleteQueue != null) { synchronized (ticketQueue) { - // freeze and insert the delete flush ticket in the queue + // Freeze and insert the delete flush ticket in the queue ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); applyFlushTickets(null, null); - } + } } indexWriter.applyAllDeletes(); indexWriter.flushCount.incrementAndGet(); @@ -196,14 +198,9 @@ synchronized void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; - pushConfigChange(); - } - - private final void pushConfigChange() { final Iterator it = perThreadPool.getAllPerThreadsIterator(); while (it.hasNext()) { - DocumentsWriterPerThread perThread = it.next().perThread; - perThread.docState.infoStream = this.infoStream; + it.next().perThread.docState.infoStream = infoStream; } } @@ -218,8 +215,9 @@ // returns boolean for asserts boolean message(String message) { - if (infoStream != null) + if (infoStream != null) { indexWriter.message("DW: " + message); + } return true; } @@ -297,45 +295,52 @@ ensureOpen(); boolean maybeMerge = false; final boolean isUpdate = delTerm != null; - if (healthiness.isStalled()) { - /* - * if we are allowed to hijack threads for flushing we try to flush out - * as many pending DWPT to release memory and get back healthy status. - */ + if (healthiness.anyStalledThreads()) { + + // Help out flushing any pending DWPTs so we can un-stall: if (infoStream != null) { - message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment"); + message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)"); } - // try pick up pending threads here if possile + + // Try pick up pending threads here if possible DocumentsWriterPerThread flushingDWPT; - while ( (flushingDWPT = flushControl.nextPendingFlush()) != null){ - // don't push the delete here since the update could fail! + while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { + // Don't push the delete here since the update could fail! maybeMerge = doFlush(flushingDWPT); - if (!healthiness.isStalled()) { + if (!healthiness.anyStalledThreads()) { break; } } - if (infoStream != null && healthiness.isStalled()) { - message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore"); + + if (infoStream != null && healthiness.anyStalledThreads()) { + message("WARNING DocumentsWriter still has stalled threads; waiting"); } + healthiness.waitIfStalled(); // block if stalled + + if (infoStream != null && healthiness.anyStalledThreads()) { + message("WARNING DocumentsWriter done waiting"); + } } + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc); final DocumentsWriterPerThread flushingDWPT; - final DocumentsWriterPerThread dwpt; + try { + if (!perThread.isActive()) { ensureOpen(); assert false: "perThread is not active but we are still open"; } - dwpt = perThread.perThread; + final DocumentsWriterPerThread dwpt = perThread.perThread; try { dwpt.updateDocument(doc, analyzer, delTerm); numDocsInRAM.incrementAndGet(); } finally { - if(dwpt.checkAndResetHasAborted()) { - flushControl.doOnAbort(perThread); + if (dwpt.checkAndResetHasAborted()) { + flushControl.doOnAbort(perThread); } } flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); @@ -376,46 +381,53 @@ * might miss to deletes documents in 'A'. */ synchronized (ticketQueue) { - // each flush is assigned a ticket in the order they accquire the ticketQueue lock - ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); - ticketQueue.add(ticket); + // Each flush is assigned a ticket in the order they accquire the ticketQueue lock + ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); + ticketQueue.add(ticket); } + // flush concurrently without locking final FlushedSegment newSegment = flushingDWPT.flush(); + + // nocommit -- should this success = true be moved + // under the applyFlushTickets? success = true; + /* - * now we are done and try to flush the ticket queue if the head of the + * Now we are done and try to flush the ticket queue if the head of the * queue has already finished the flush. */ applyFlushTickets(ticket, newSegment); } finally { - flushControl.doAfterFlush(flushingDWPT); - flushingDWPT.checkAndResetHasAborted(); - indexWriter.flushCount.incrementAndGet(); - if (!success && ticket != null) { - synchronized (ticketQueue) { - // in the case of a failure make sure we are making progress and - // apply all the deletes since the segment flush failed - ticket.isSegmentFlush = false; - - } + flushControl.doAfterFlush(flushingDWPT); + flushingDWPT.checkAndResetHasAborted(); + indexWriter.flushCount.incrementAndGet(); + if (!success && ticket != null) { + synchronized (ticketQueue) { + // nocommit -- shouldn't we drop the ticket in + // this case? + // In the case of a failure make sure we are making progress and + // apply all the deletes since the segment flush failed + ticket.isSegmentFlush = false; } + } } - flushingDWPT = flushControl.nextPendingFlush() ; + flushingDWPT = flushControl.nextPendingFlush(); } return maybeMerge; } - private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException { synchronized (ticketQueue) { if (current != null) { - // this is a segment FlushTicket so assign the flushed segment so we can make progress. + // nocommit -- can't caller set current.segment = segment? + // nocommit -- confused by this comment: + // This is a segment FlushTicket so assign the flushed segment so we can make progress. assert segment != null; current.segment = segment; } while (true) { - // while we can publish flushes keep on making the queue empty. + // Keep publishing eligible flushed segments: final FlushTicket head = ticketQueue.peek(); if (head != null && head.canPublish()) { ticketQueue.poll(); @@ -426,11 +438,10 @@ } } } - private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) throws IOException { - // this is eventually finishing the flushed segment and publishing it to the IndexWriter + // Finish the flushed segment and publish it to IndexWriter if (newSegment == null) { assert bufferedDeletes != null; if (bufferedDeletes != null && bufferedDeletes.any()) { @@ -442,9 +453,6 @@ } else { publishFlushedSegment(newSegment, bufferedDeletes); } - - - } final void subtractFlushedNumDocs(int numFlushed) { @@ -455,10 +463,10 @@ } /** - * publishes the flushed segment, segment private deletes if any and its - * associated global delete if present to the index writer. the actual - * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo} - * 's delete generation is always GlobalPacket_deleteGeneration + 1 + * Publishes the flushed segment, segment private deletes (if any) and its + * associated global delete (if present) to IndexWriter. The actual + * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s + * delete generation is always GlobalPacket_deleteGeneration + 1 */ private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket) throws IOException { @@ -467,12 +475,13 @@ final BufferedDeletes deletes = newSegment.segmentDeletes; FrozenBufferedDeletes packet = null; if (deletes != null && deletes.any()) { - // segment private delete + // Segment private delete packet = new FrozenBufferedDeletes(deletes, true); if (infoStream != null) { message("flush: push buffered seg private deletes: " + packet); } } + // now publish! indexWriter.publishFlushedSegment(segInfo, packet, globalPacket); } @@ -486,10 +495,9 @@ } /* - * flushAllThreads is synced by IW fullFlushLock. Flushing all threads is a - * two stage operations, the caller must ensure that #finishFlush is called - * after this method to release the flush lock in DWFlushControl - use try / - * finally! + * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a + * two stage operation; the caller must ensure (in try/finally) that finishFlush + * is called after this method, to release the flush lock in DWFlushControl */ final boolean flushAllThreads(final boolean flushDeletes) throws IOException { @@ -497,9 +505,11 @@ synchronized (this) { flushingDeleteQueue = deleteQueue; - /* sets a new delete queue - this must be synced on the flush control + /* Cutover to a new delete queue. This must be synced on the flush control * otherwise a new DWPT could sneak into the loop with an already flushing * delete queue */ + // nocommit -- shouldn't we do this?: + // assert Thread.holdsLock(flushControl); flushControl.markForFullFlush(); assert setFlushingDeleteQueue(flushingDeleteQueue); } @@ -509,18 +519,18 @@ boolean anythingFlushed = false; try { DocumentsWriterPerThread flushingDWPT; - // now try help out with flushing + // Help out with flushing: while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { anythingFlushed |= doFlush(flushingDWPT); } - // if a concurrent flush is still in flight wait for it - while (!flushControl.allFlushesDue()) { + // If a concurrent flush is still in flight wait for it + while (flushControl.anyFlushing()) { flushControl.waitForFlush(); } if (!anythingFlushed && flushDeletes) { synchronized (ticketQueue) { ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false)); - } + } applyFlushTickets(null, null); } } finally { @@ -532,13 +542,16 @@ final void finishFullFlush(boolean success) { assert setFlushingDeleteQueue(null); if (success) { - // release the flush lock + // Release the flush lock flushControl.finishFullFlush(); } else { flushControl.abortFullFlushes(); } } - + + // nocommit -- can we add comment justifying that these + // fields are safely changed across threads because they + // are always accessed in sync(ticketQueue)? static final class FlushTicket { final FrozenBufferedDeletes frozenDeletes; FlushedSegment segment; Index: lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java =================================================================== --- lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (working copy) @@ -20,6 +20,8 @@ import org.apache.lucene.document.Document; +// nocommit jdoc +// nocommit -- can/should apps set this via IWC public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool { private Map threadBindings = new ConcurrentHashMap(); @@ -37,8 +39,12 @@ } } ThreadState minThreadState = null; - // find the state that has minimum amount of threads waiting + + // Find the state that has minimum number of threads waiting + // noocommit -- can't another thread lock the + // minThreadState we just got? minThreadState = minContendedThreadState(); + if (minThreadState == null || minThreadState.hasQueuedThreads()) { ThreadState newState = newThreadState(); if (newState != null) { @@ -59,6 +65,7 @@ return minThreadState; } + /* @Override public void clearThreadBindings(ThreadState perThread) { threadBindings.clear(); @@ -68,5 +75,5 @@ public void clearAllThreadBindings() { threadBindings.clear(); } - + */ } Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -383,7 +383,7 @@ if (!success && infoStream != null) { message("hit exception during while NRT reader"); } - // now we are done - finish the full flush! + // Done: finish the full flush! docWriter.finishFullFlush(success); doAfterFlush(); } @@ -2073,7 +2073,7 @@ if (useCompoundFile(newSegment)) { String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION); message("creating compound file " + compoundFileName); - // Now build compound file + // Now build compound file CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); for(String fileName : newSegment.files()) { cfsWriter.addFile(fileName); @@ -2146,18 +2146,18 @@ */ synchronized void publishFlushedSegment(SegmentInfo newSegment, FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException { - // lock order IW -> BDS + // Lock order IW -> BDS synchronized (bufferedDeletesStream) { if (globalPacket != null && globalPacket.any()) { bufferedDeletesStream.push(globalPacket); } - // publishing the segment must be synched on IW -> BDS to make the sure + // Publishing the segment must be synched on IW -> BDS to make the sure // that no merge prunes away the seg. private delete packet final long nextGen; if (packet != null && packet.any()) { nextGen = bufferedDeletesStream.push(packet); } else { - // since we don't have a delete packet to apply we can get a new + // Since we don't have a delete packet to apply we can get a new // generation right away nextGen = bufferedDeletesStream.getNextGen(); } @@ -2572,7 +2572,11 @@ message("commit: done"); } } + + // Ensures only one flush() is actually flushing segments + // at a time: private final Object fullFlushLock = new Object(); + /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -2595,9 +2599,7 @@ maybeMerge(); } } - // TODO: this method should not have to be entirely - // synchronized, ie, merges should be allowed to commit - // even while a flush is happening + private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException { if (hitOOM) { throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush"); @@ -2645,6 +2647,8 @@ final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException { if (!applyAllDeletes) { + // nocommit -- shouldn't this move into the default + // flush policy? // If deletes alone are consuming > 1/2 our RAM // buffer, force them all to apply now. This is to // prevent too-frequent flushing of a long tail of @@ -2670,31 +2674,31 @@ } final synchronized void applyAllDeletes() throws IOException { - flushDeletesCount.incrementAndGet(); - final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream - .applyDeletes(readerPool, segmentInfos); - if (result.anyDeletes) { - checkpoint(); + flushDeletesCount.incrementAndGet(); + final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream + .applyDeletes(readerPool, segmentInfos); + if (result.anyDeletes) { + checkpoint(); + } + if (!keepFullyDeletedSegments && result.allDeleted != null) { + if (infoStream != null) { + message("drop 100% deleted segments: " + result.allDeleted); } - if (!keepFullyDeletedSegments && result.allDeleted != null) { - if (infoStream != null) { - message("drop 100% deleted segments: " + result.allDeleted); - } - for (SegmentInfo info : result.allDeleted) { - // If a merge has already registered for this - // segment, we leave it in the readerPool; the - // merge will skip merging it and will then drop - // it once it's done: - if (!mergingSegments.contains(info)) { - segmentInfos.remove(info); - if (readerPool != null) { - readerPool.drop(info); - } + for (SegmentInfo info : result.allDeleted) { + // If a merge has already registered for this + // segment, we leave it in the readerPool; the + // merge will skip merging it and will then drop + // it once it's done: + if (!mergingSegments.contains(info)) { + segmentInfos.remove(info); + if (readerPool != null) { + readerPool.drop(info); } } - checkpoint(); } - bufferedDeletesStream.prune(segmentInfos); + checkpoint(); + } + bufferedDeletesStream.prune(segmentInfos); } /** Expert: Return the total size of all index files currently cached in memory.