Index: lucene/src/java/org/apache/lucene/index/FlushPolicy.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FlushPolicy.java (revision 1092622) +++ lucene/src/java/org/apache/lucene/index/FlushPolicy.java (working copy) @@ -32,16 +32,16 @@ * {@link IndexWriterConfig#setRAMBufferSizeMB(double)} *
+ * nocommit: what does this note mean...? * Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} @@ -66,9 +67,10 @@ ThreadState state); /** - * Called for each document update on the given {@link ThreadState}s + * Called for each document update on the given {@link ThreadState}'s * {@link DocumentsWriterPerThread}. *
+ * nocommit: what does this note mean...?
* Note: This method is synchronized by the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
* thread holds the lock on the given {@link ThreadState}
@@ -103,6 +105,7 @@
* Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
* pending
*/
+ // nocommit -- move to default policy?
protected void markLargestWriterPending(DocumentsWriterFlushControl control,
ThreadState perThreadState, final long currentBytesPerThread) {
control
@@ -117,7 +120,8 @@
*/
protected ThreadState findLargestNonPendingWriter(
DocumentsWriterFlushControl control, ThreadState perThreadState) {
- long maxRamSoFar = perThreadState.perThreadBytes;
+ assert perThreadState.perThread.getNumDocsInRAM() > 0;
+ long maxRamSoFar = perThreadState.bytesUsed;
// the dwpt which needs to be flushed eventually
ThreadState maxRamUsingThreadState = perThreadState;
assert !perThreadState.flushPending : "DWPT should have flushed";
@@ -125,7 +129,7 @@
while (activePerThreadsIterator.hasNext()) {
ThreadState next = activePerThreadsIterator.next();
if (!next.flushPending) {
- final long nextRam = next.perThreadBytes;
+ final long nextRam = next.bytesUsed;
if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
maxRamSoFar = nextRam;
maxRamUsingThreadState = next;
@@ -138,6 +142,8 @@
return maxRamUsingThreadState;
}
+ // nocommit -- I thought we pause based on "too many flush
+ // states pending"?
/**
* Returns the max net memory which marks the upper watermark for the
* DocumentsWriter to be healthy. If all flushing and active
@@ -155,6 +161,7 @@
*/
public long getMaxNetBytes() {
if (!flushOnRAM()) {
+ // nocommit explain that returning -1 is allowed?
return -1;
}
final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
@@ -166,6 +173,8 @@
* {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
* false.
*/
+ // nocommit who needs this? policy shouldn't have to impl
+ // this? our default policy should?
protected boolean flushOnDocCount() {
return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
}
@@ -175,6 +184,8 @@
* {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
* false.
*/
+ // nocommit who needs this? policy shouldn't have to impl
+ // this? our default policy should?
protected boolean flushOnDeleteTerms() {
return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
}
@@ -184,6 +195,8 @@
* {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
* false.
*/
+ // nocommit who needs this? policy shouldn't have to impl
+ // this? our default policy should?
protected boolean flushOnRAM() {
return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
}
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (working copy)
@@ -40,7 +40,7 @@
*/
public final class DocumentsWriterFlushControl {
- private final long maxBytesPerDWPT;
+ private final long hardMaxBytesPerDWPT;
private long activeBytes = 0;
private long flushBytes = 0;
private volatile int numPending = 0;
@@ -63,11 +63,11 @@
private final DocumentsWriter documentsWriter;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter,
- Healthiness healthiness, long maxBytesPerDWPT) {
+ Healthiness healthiness, long hardMaxBytesPerDWPT) {
this.healthiness = healthiness;
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
- this.maxBytesPerDWPT = maxBytesPerDWPT;
+ this.hardMaxBytesPerDWPT = hardMaxBytesPerDWPT;
this.documentsWriter = documentsWriter;
}
@@ -85,8 +85,8 @@
private void commitPerThreadBytes(ThreadState perThread) {
final long delta = perThread.perThread.bytesUsed()
- - perThread.perThreadBytes;
- perThread.perThreadBytes += delta;
+ - perThread.bytesUsed;
+ perThread.bytesUsed += delta;
/*
* We need to differentiate here if we are pending since setFlushPending
* moves the perThread memory to the flushBytes and we could be set to
@@ -100,6 +100,7 @@
assert updatePeaks(delta);
}
+ // only for asserts
private boolean updatePeaks(long delta) {
peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
@@ -116,10 +117,9 @@
} else {
flushPolicy.onInsert(this, perThread);
}
- if (!perThread.flushPending && perThread.perThreadBytes > maxBytesPerDWPT) {
- // safety check to prevent a single DWPT exceeding its RAM limit. This
- // is super
- // important since we can not address more than 2048 MB per DWPT
+ if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
+ // Safety check to prevent a single DWPT exceeding its RAM limit. This
+ // is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
if (fullFlush) {
DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false);
@@ -146,8 +146,8 @@
}
}
- public synchronized boolean allFlushesDue() {
- return numFlushing == 0;
+ public synchronized boolean anyFlushing() {
+ return numFlushing != 0;
}
public synchronized void waitForFlush() {
@@ -167,9 +167,10 @@
*/
public synchronized void setFlushPending(ThreadState perThread) {
assert !perThread.flushPending;
+ // nocommit -- what if we are flushing only due to deletes?
assert perThread.perThread.getNumDocsInRAM() > 0;
perThread.flushPending = true; // write access synced
- final long bytes = perThread.perThreadBytes;
+ final long bytes = perThread.bytesUsed;
flushBytes += bytes;
activeBytes -= bytes;
numPending++; // write access synced
@@ -177,19 +178,20 @@
synchronized void doOnAbort(ThreadState state) {
if (state.flushPending) {
- flushBytes -= state.perThreadBytes;
+ flushBytes -= state.bytesUsed;
} else {
- activeBytes -= state.perThreadBytes;
+ activeBytes -= state.bytesUsed;
}
- // take it out of the loop this DWPT is stale
+ // Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
healthiness.updateStalled(this);
}
synchronized DocumentsWriterPerThread tryCheckoutForFlush(
ThreadState perThread, boolean setPending) {
- if (fullFlush)
+ if (fullFlush) {
return null;
+ }
return internalTryCheckOutForFlush(perThread, setPending);
}
@@ -199,17 +201,17 @@
setFlushPending(perThread);
}
if (perThread.flushPending) {
- // we are pending so all memory is already moved to flushBytes
+ // We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
if (perThread.isActive()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
- final long bytes = perThread.perThreadBytes; // do that before
+ final long bytes = perThread.bytesUsed; // do that before
// replace!
dwpt = perThreadPool.replaceForFlush(perThread, closed);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
- // record the flushing DWPT to reduce flushBytes in doAfterFlush
+ // Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
numPending--; // write access synced
numFlushing++;
@@ -296,9 +298,13 @@
return numFlushing;
}
- public void setFlushDeletes() {
- flushDeletes.set(true);
+ public boolean doApplyAllDeletes() {
+ return flushDeletes.getAndSet(false);
}
+
+ public void setApplyAllDeletes() {
+ flushDeletes.set(true);
+ }
int numActiveDWPT() {
return this.perThreadPool.getMaxThreadStates();
@@ -310,7 +316,7 @@
assert !fullFlush;
fullFlush = true;
flushingQueue = documentsWriter.deleteQueue;
- // set a new delete queue - all subsequent DWPT will use this queue until
+ // Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
documentsWriter.deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
}
@@ -372,9 +378,9 @@
}
} finally {
+ fullFlush = false;
flushQueue.clear();
blockedFlushes.clear();
- fullFlush = false;
}
}
Index: lucene/src/java/org/apache/lucene/index/Healthiness.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/Healthiness.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/Healthiness.java (working copy)
@@ -83,10 +83,10 @@
}
}
- private final Healthiness.Sync sync = new Sync();
+ private final Sync sync = new Sync();
volatile boolean wasStalled = false; // only with asserts
- boolean isStalled() {
+ boolean anyStalledThreads() {
return !sync.isHealthy();
}
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (working copy)
@@ -44,7 +44,7 @@
// write access guarded by DocumentsWriterFlushControl
volatile boolean flushPending = false;
// write access guarded by DocumentsWriterFlushControl
- long perThreadBytes = 0;
+ long bytesUsed = 0;
// guarded by Reentrant lock
private boolean isActive = true;
@@ -65,7 +65,7 @@
isActive = false;
}
this.perThread = perThread;
- this.perThreadBytes = 0;
+ this.bytesUsed = 0;
this.flushPending = false;
}
@@ -86,7 +86,7 @@
public long getBytesUsedPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
- return perThreadBytes;
+ return bytesUsed;
}
/**
@@ -162,9 +162,9 @@
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
- public abstract void clearThreadBindings(ThreadState perThread);
+ //public abstract void clearThreadBindings(ThreadState perThread);
- public abstract void clearAllThreadBindings();
+ // public abstract void clearAllThreadBindings();
/**
* Returns an iterator providing access to all {@link ThreadState}
Index: lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (working copy)
@@ -50,12 +50,10 @@
int hashMask = 1;
int totalFieldCount;
-
float docBoost;
int fieldGen;
final DocumentsWriterPerThread.DocState docState;
-
public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
this.docState = docWriter.docState;
this.consumer = consumer;
@@ -254,7 +252,6 @@
}
}
-
void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
if (lo >= hi)
return;
Index: lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (working copy)
@@ -52,7 +52,7 @@
final int numAllFields = allFields.size();
- // sort by field name
+ // Sort by field name
CollectionUtil.quickSort(allFields);
final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (working copy)
@@ -23,15 +23,15 @@
/**
* {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
- * queue. In contrast to other queue implementation we only maintain only the
+ * queue. In contrast to other queue implementation we only maintain the
* tail of the queue. A delete queue is always used in a context of a set of
- * DWPT and a global delete pool. Each of the DWPT and the global pool need to
- * maintain their 'own' head of the queue. The difference between the DWPT and
- * the global pool is that the DWPT starts maintaining a head once it has added
- * its first document since for its segments private deletes only the deletes
- * after that document are relevant. The global pool instead starts maintaining
- * the head once this instance is created by taking the sentinel instance as its
- * initial head.
+ * DWPTs and a global delete pool. Each of the DWPT and the global pool need to
+ * maintain their 'own' head of the queue (as a DeleteSlice instance per DWPT).
+ * The difference between the DWPT and the global pool is that the DWPT starts
+ * maintaining a head once it has added its first document since for its segments
+ * private deletes only the deletes after that document are relevant. The global
+ * pool instead starts maintaining the head once this instance is created by
+ * taking the sentinel instance as its initial head.
*
* Since each {@link DeleteSlice} maintains its own head and the list is only * single linked the garbage collector takes care of pruning the list for us. @@ -41,12 +41,12 @@ *
* Each DWPT as well as the global delete pool maintain their private * DeleteSlice instance. In the DWPT case updating a slice is equivalent to - * atomically finishing the document. The slice update guarantees a happens - * before relationship to all other updates in the same indexing session. When a - * DWPT updates a document it + * atomically finishing the document. The slice update guarantees a "happens + * before" relationship to all other updates in the same indexing session. When a + * DWPT updates a document it: * *
- * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled always the
+ * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the
* largest ram consuming {@link DocumentsWriterPerThread} will be marked as
- * pending iff the global active RAM consumption is equals or higher the
+ * pending iff the global active RAM consumption is >= the
* configured max RAM buffer.
*/
public class FlushByRamOrCountsPolicy extends FlushPolicy {
@@ -38,10 +38,11 @@
@Override
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
if (flushOnDeleteTerms()) {
+ // Flush this state by num del terms
final int maxBufferedDeleteTerms = indexWriterConfig
.getMaxBufferedDeleteTerms();
if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) {
- control.setFlushDeletes();
+ control.setApplyAllDeletes();
}
}
}
@@ -51,12 +52,12 @@
if (flushOnDocCount()
&& state.perThread.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) {
- control.setFlushPending(state); // flush by num docs
+ // Flush this state by num docs
+ control.setFlushPending(state);
} else {// flush by RAM
if (flushOnRAM()) {
- final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+ final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
final long totalRam = control.activeBytes();
- final long limit = (long) (ramBufferSizeMB * 1024.d * 1024.d);
if (totalRam >= limit) {
markLargestWriterPending(control, state, totalRam);
}
Index: lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy)
@@ -208,11 +208,11 @@
}
if (!packet.isSegmentPrivate) {
/*
- * only update the coalescededDeletes if we are NOT on a segment private del packet.
- * the segment private del packet must only applied to segments with the same delGen.
- * Yet, if a segment is already deleted from the SI since it had no more documents remaining
- * after some del packets younger than it segPrivate packet (hihger delGen) have been applied
- * the segPrivate packet has not been removed.
+ * Only coalesce if we are NOT on a segment private del packet: the segment private del packet
+ * must only applied to segments with the same delGen. Yet, if a segment is already deleted
+ * from the SI since it had no more documents remaining after some del packets younger than
+ * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
+ * removed.
*/
coalescedDeletes.update(packet);
}
@@ -259,7 +259,7 @@
}
/*
- * since we are on a segment private del packet we must not
+ * Since we are on a segment private del packet we must not
* update the coalescedDeletes here! We can simply advance to the
* next packet and seginfo.
*/
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1092622)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy)
@@ -39,10 +39,7 @@
/**
* This class accepts multiple added documents and directly
- * writes a single segment file. It does this more
- * efficiently than creating a single segment per document
- * (with DocumentWriter) and doing standard merges on those
- * segments.
+ * writes segment files.
*
* Each added document is passed to the {@link DocConsumer},
* which in turn processes the document and interacts with
@@ -152,8 +149,11 @@
}
synchronized boolean deleteQueries(final Query... queries) throws IOException {
- final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
+ // nocommit -- shouldn't we check for doApplyAllDeletes
+ // here too?
+ // nocommit shouldn't this consult flush policy? or
+ // should this return void now?
return false;
}
@@ -165,9 +165,11 @@
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
flushControl.doOnDelete();
- if (flushControl.flushDeletes.getAndSet(false)) {
- flushDeletes(deleteQueue);
+ if (flushControl.doApplyAllDeletes()) {
+ applyAllDeletes(deleteQueue);
}
+ // nocommit shouldn't this consult flush policy? or
+ // should this return void now?
return false;
}
@@ -182,13 +184,13 @@
return deleteQueue;
}
- private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+ private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null) {
synchronized (ticketQueue) {
- // freeze and insert the delete flush ticket in the queue
+ // Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
applyFlushTickets(null, null);
- }
+ }
}
indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet();
@@ -196,14 +198,9 @@
synchronized void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
- pushConfigChange();
- }
-
- private final void pushConfigChange() {
final Iterator