diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index b61d152..9d444bb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -19,7 +19,6 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -213,18 +212,18 @@ final class DocumentsWriter { infoStream.message("DW", "abort"); } - final Iterator threadsIterator = perThreadPool.getActivePerThreadsIterator(); - while (threadsIterator.hasNext()) { - final ThreadState perThread = threadsIterator.next(); + final int limit = perThreadPool.getActiveThreadState(); + for (int i = 0; i < limit; i++) { + final ThreadState perThread = perThreadPool.getThreadState(i); perThread.lock(); try { if (perThread.isActive()) { // we might be closed try { - perThread.perThread.abort(); + perThread.dwpt.abort(); } catch (IOException ex) { // continue } finally { - perThread.perThread.checkAndResetHasAborted(); + perThread.dwpt.checkAndResetHasAborted(); flushControl.doOnAbort(perThread); } } else { @@ -338,7 +337,7 @@ final class DocumentsWriter { assert false: "perThread is not active but we are still open"; } - final DocumentsWriterPerThread dwpt = perThread.perThread; + final DocumentsWriterPerThread dwpt = perThread.dwpt; try { final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); numDocsInRAM.addAndGet(docCount); @@ -372,7 +371,7 @@ final class DocumentsWriter { assert false: "perThread is not active but we are still open"; } - final DocumentsWriterPerThread dwpt = perThread.perThread; + final DocumentsWriterPerThread dwpt = perThread.dwpt; try { dwpt.updateDocument(doc, analyzer, delTerm); numDocsInRAM.incrementAndGet(); @@ -587,22 +586,4 @@ final class DocumentsWriter { } } - - - - - // use by IW during close to assert all DWPT are inactive after final flush - boolean assertNoActiveDWPT() { - Iterator activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator(); - while(activePerThreadsIterator.hasNext()) { - ThreadState next = activePerThreadsIterator.next(); - next.lock(); - try { - assert !next.isActive(); - } finally { - next.unlock(); - } - } - return true; - } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index d4c8be3..18b4769 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -18,8 +18,8 @@ package org.apache.lucene.index; */ import java.io.IOException; import java.util.ArrayList; +import java.util.IdentityHashMap; import java.util.List; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; @@ -51,6 +51,8 @@ public final class DocumentsWriterFlushControl { private final Queue flushQueue = new LinkedList(); // only for safety reasons if a DWPT is close to the RAM limit private final Queue blockedFlushes = new LinkedList(); + private final IdentityHashMap flushingWriters = new IdentityHashMap(); + double maxConfiguredRamBuffer = 0; long peakActiveBytes = 0;// only with assert @@ -61,7 +63,6 @@ public final class DocumentsWriterFlushControl { private final DocumentsWriterPerThreadPool perThreadPool; private final FlushPolicy flushPolicy; private boolean closed = false; - private final HashMap flushingWriters = new HashMap(); private final DocumentsWriter documentsWriter; private final IndexWriterConfig config; @@ -122,7 +123,7 @@ public final class DocumentsWriterFlushControl { } private void commitPerThreadBytes(ThreadState perThread) { - final long delta = perThread.perThread.bytesUsed() + final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed; perThread.bytesUsed += delta; /* @@ -212,7 +213,7 @@ public final class DocumentsWriterFlushControl { */ public synchronized void setFlushPending(ThreadState perThread) { assert !perThread.flushPending; - if (perThread.perThread.getNumDocsInRAM() > 0) { + if (perThread.dwpt.getNumDocsInRAM() > 0) { perThread.flushPending = true; // write access synced final long bytes = perThread.bytesUsed; flushBytes += bytes; @@ -295,18 +296,21 @@ public final class DocumentsWriterFlushControl { } DocumentsWriterPerThread nextPendingFlush() { + int numPending; + boolean fullFlush; synchronized (this) { final DocumentsWriterPerThread poll; if ((poll = flushQueue.poll()) != null) { stallControl.updateStalled(this); return poll; } + fullFlush = this.fullFlush; + numPending = this.numPending; } if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush - final Iterator allActiveThreads = perThreadPool - .getActivePerThreadsIterator(); - while (allActiveThreads.hasNext() && numPending > 0) { - ThreadState next = allActiveThreads.next(); + final int limit = perThreadPool.getActiveThreadState(); + for (int i = 0; i < limit && numPending > 0; i++) { + final ThreadState next = perThreadPool.getThreadState(i); if (next.flushPending) { final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next); if (dwpt != null) { @@ -327,9 +331,29 @@ public final class DocumentsWriterFlushControl { /** * Returns an iterator that provides access to all currently active {@link ThreadState}s */ - public Iterator allActiveThreads() { - return perThreadPool.getActivePerThreadsIterator(); + public Iterator allActiveThreadStates() { + return getPerThreadsIterator(perThreadPool.getActiveThreadState()); } + + private Iterator getPerThreadsIterator(final int upto) { + return new Iterator() { + int i = 0; + + public boolean hasNext() { + return i < upto; + } + + public ThreadState next() { + return perThreadPool.getThreadState(i++); + } + + public void remove() { + throw new UnsupportedOperationException("remove() not supported."); + } + }; + } + + synchronized void doOnDelete() { // pass null this is a global delete no update @@ -369,7 +393,7 @@ public final class DocumentsWriterFlushControl { boolean success = false; try { if (perThread.isActive() - && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) { + && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: @@ -397,23 +421,23 @@ public final class DocumentsWriterFlushControl { DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1); documentsWriter.deleteQueue = newQueue; } - final Iterator allActiveThreads = perThreadPool.getActivePerThreadsIterator(); - while (allActiveThreads.hasNext()) { - final ThreadState next = allActiveThreads.next(); + final int limit = perThreadPool.getActiveThreadState(); + for (int i = 0; i < limit; i++) { + final ThreadState next = perThreadPool.getThreadState(i); next.lock(); try { if (!next.isActive()) { continue; } - assert next.perThread.deleteQueue == flushingQueue - || next.perThread.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " + assert next.dwpt.deleteQueue == flushingQueue + || next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " + flushingQueue + " currentqueue: " + documentsWriter.deleteQueue + " perThread queue: " - + next.perThread.deleteQueue - + " numDocsInRam: " + next.perThread.getNumDocsInRAM(); - if (next.perThread.deleteQueue != flushingQueue) { + + next.dwpt.deleteQueue + + " numDocsInRam: " + next.dwpt.getNumDocsInRAM(); + if (next.dwpt.deleteQueue != flushingQueue) { // this one is already a new DWPT continue; } @@ -437,12 +461,12 @@ public final class DocumentsWriterFlushControl { } private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) { - final Iterator allActiveThreads = perThreadPool.getActivePerThreadsIterator(); - while (allActiveThreads.hasNext()) { - final ThreadState next = allActiveThreads.next(); + final int limit = perThreadPool.getActiveThreadState(); + for (int i = 0; i < limit; i++) { + final ThreadState next = perThreadPool.getThreadState(i); next.lock(); try { - assert !next.isActive() || next.perThread.deleteQueue == queue; + assert !next.isActive() || next.dwpt.deleteQueue == queue; } finally { next.unlock(); } @@ -454,9 +478,9 @@ public final class DocumentsWriterFlushControl { void addFlushableState(ThreadState perThread) { if (documentsWriter.infoStream.isEnabled("DWFC")) { - documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread); + documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.dwpt); } - final DocumentsWriterPerThread dwpt = perThread.perThread; + final DocumentsWriterPerThread dwpt = perThread.dwpt; assert perThread.isHeldByCurrentThread(); assert perThread.isActive(); assert fullFlush; @@ -473,9 +497,9 @@ public final class DocumentsWriterFlushControl { } } else { if (closed) { - perThread.resetWriter(null); // make this state inactive + perThreadPool.deactivateThreadState(perThread); // make this state inactive } else { - dwpt.initialize(); + perThreadPool.reinitThreadState(perThread); } } } @@ -597,4 +621,6 @@ public final class DocumentsWriterFlushControl { boolean anyStalledThreads() { return stallControl.anyStalledThreads(); } + + } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 8b959ca..9d5bf30 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -16,7 +16,6 @@ package org.apache.lucene.index; * limitations under the License. */ -import java.util.Iterator; import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; @@ -38,11 +37,6 @@ import org.apache.lucene.util.SetOnce; *

*/ public abstract class DocumentsWriterPerThreadPool { - /** The maximum number of simultaneous threads that may be - * indexing documents at once in IndexWriter; if more - * than this many threads arrive they will wait for - * others to finish. */ - public final static int DEFAULT_MAX_THREAD_STATES = 8; /** * {@link ThreadState} references and guards a @@ -57,17 +51,18 @@ public abstract class DocumentsWriterPerThreadPool { */ @SuppressWarnings("serial") public final static class ThreadState extends ReentrantLock { - // package private for FlushPolicy - DocumentsWriterPerThread perThread; + DocumentsWriterPerThread dwpt; + // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; + // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl long bytesUsed = 0; // guarded by Reentrant lock private boolean isActive = true; - ThreadState(DocumentsWriterPerThread perThread) { - this.perThread = perThread; + ThreadState(DocumentsWriterPerThread dpwt) { + this.dwpt = dpwt; } /** @@ -76,12 +71,12 @@ public abstract class DocumentsWriterPerThreadPool { * for indexing anymore. * @see #isActive() */ - void resetWriter(DocumentsWriterPerThread perThread) { + private void resetWriter(DocumentsWriterPerThread dwpt) { assert this.isHeldByCurrentThread(); - if (perThread == null) { + if (dwpt == null) { isActive = false; } - this.perThread = perThread; + this.dwpt = dwpt; this.bytesUsed = 0; this.flushPending = false; } @@ -112,7 +107,7 @@ public abstract class DocumentsWriterPerThreadPool { public DocumentsWriterPerThread getDocumentsWriterPerThread() { assert this.isHeldByCurrentThread(); // public for FlushPolicy - return perThread; + return dwpt; } /** @@ -124,40 +119,37 @@ public abstract class DocumentsWriterPerThreadPool { } } - private final ThreadState[] perThreads; + private final ThreadState[] threadStates; private volatile int numThreadStatesActive; - private FieldNumberBiMap globalFieldMap; + private final SetOnce globalFieldMap = new SetOnce(); private final SetOnce documentsWriter = new SetOnce(); /** - * Creates a new {@link DocumentsWriterPerThreadPool} with max. - * {@link #DEFAULT_MAX_THREAD_STATES} thread states. + * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s. */ - public DocumentsWriterPerThreadPool() { - this(DEFAULT_MAX_THREAD_STATES); - } - - public DocumentsWriterPerThreadPool(int maxNumPerThreads) { - maxNumPerThreads = (maxNumPerThreads < 1) ? DEFAULT_MAX_THREAD_STATES : maxNumPerThreads; - perThreads = new ThreadState[maxNumPerThreads]; + public DocumentsWriterPerThreadPool(int maxNumThreadStates) { + if (maxNumThreadStates < 1) { + throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates); + } + threadStates = new ThreadState[maxNumThreadStates]; numThreadStatesActive = 0; } public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) { this.documentsWriter.set(documentsWriter); // thread pool is bound to DW - this.globalFieldMap = globalFieldMap; - for (int i = 0; i < perThreads.length; i++) { + this.globalFieldMap.set(globalFieldMap); + for (int i = 0; i < threadStates.length; i++) { final FieldInfos infos = new FieldInfos(globalFieldMap); - perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); + threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); } } - + /** * Returns the max number of {@link ThreadState} instances available in this * {@link DocumentsWriterPerThreadPool} */ public int getMaxThreadStates() { - return perThreads.length; + return threadStates.length; } /** @@ -178,16 +170,16 @@ public abstract class DocumentsWriterPerThreadPool { * null */ public synchronized ThreadState newThreadState() { - if (numThreadStatesActive < perThreads.length) { - final ThreadState threadState = perThreads[numThreadStatesActive]; + if (numThreadStatesActive < threadStates.length) { + final ThreadState threadState = threadStates[numThreadStatesActive]; threadState.lock(); // lock so nobody else will get this ThreadState boolean unlock = true; try { if (threadState.isActive()) { // unreleased thread states are deactivated during DW#close() numThreadStatesActive++; // increment will publish the ThreadState - assert threadState.perThread != null; - threadState.perThread.initialize(); + assert threadState.dwpt != null; + threadState.dwpt.initialize(); unlock = false; return threadState; } @@ -205,12 +197,12 @@ public abstract class DocumentsWriterPerThreadPool { } private synchronized boolean assertUnreleasedThreadStatesInactive() { - for (int i = numThreadStatesActive; i < perThreads.length; i++) { - assert perThreads[i].tryLock() : "unreleased threadstate should not be locked"; + for (int i = numThreadStatesActive; i < threadStates.length; i++) { + assert threadStates[i].tryLock() : "unreleased threadstate should not be locked"; try { - assert !perThreads[i].isActive() : "expected unreleased thread state to be inactive"; + assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive"; } finally { - perThreads[i].unlock(); + threadStates[i].unlock(); } } return true; @@ -220,8 +212,8 @@ public abstract class DocumentsWriterPerThreadPool { * Deactivate all unreleased threadstates */ protected synchronized void deactivateUnreleasedStates() { - for (int i = numThreadStatesActive; i < perThreads.length; i++) { - final ThreadState threadState = perThreads[i]; + for (int i = numThreadStatesActive; i < threadStates.length; i++) { + final ThreadState threadState = threadStates[i]; threadState.lock(); try { threadState.resetWriter(null); @@ -233,9 +225,10 @@ public abstract class DocumentsWriterPerThreadPool { protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) { assert threadState.isHeldByCurrentThread(); - final DocumentsWriterPerThread dwpt = threadState.perThread; + assert globalFieldMap.get() != null; + final DocumentsWriterPerThread dwpt = threadState.dwpt; if (!closed) { - final FieldInfos infos = new FieldInfos(globalFieldMap); + final FieldInfos infos = new FieldInfos(globalFieldMap.get()); final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos); newDwpt.initialize(); threadState.resetWriter(newDwpt); @@ -251,45 +244,19 @@ public abstract class DocumentsWriterPerThreadPool { public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter); + /** - * Returns an iterator providing access to all {@link ThreadState} - * instances. - */ - // TODO: new Iterator per indexed doc is overkill...? - public Iterator getAllPerThreadsIterator() { - return getPerThreadsIterator(this.perThreads.length); - } - - /** - * Returns an iterator providing access to all active {@link ThreadState} - * instances. - *

- * Note: The returned iterator will only iterator - * {@link ThreadState}s that are active at the point in time when this method - * has been called. + * Returns the ith active {@link ThreadState} where i is the + * given ord. * + * @param ord + * the ordinal of the {@link ThreadState} + * @return the ith active {@link ThreadState} where i is the + * given ord. */ - // TODO: new Iterator per indexed doc is overkill...? - public Iterator getActivePerThreadsIterator() { - return getPerThreadsIterator(numThreadStatesActive); - } - - private Iterator getPerThreadsIterator(final int upto) { - return new Iterator() { - int i = 0; - - public boolean hasNext() { - return i < upto; - } - - public ThreadState next() { - return perThreads[i++]; - } - - public void remove() { - throw new UnsupportedOperationException("remove() not supported."); - } - }; + ThreadState getThreadState(int ord) { + assert ord < numThreadStatesActive; + return threadStates[ord]; } /** @@ -299,14 +266,59 @@ public abstract class DocumentsWriterPerThreadPool { */ protected ThreadState minContendedThreadState() { ThreadState minThreadState = null; - // TODO: new Iterator per indexed doc is overkill...? - final Iterator it = getActivePerThreadsIterator(); - while (it.hasNext()) { - final ThreadState state = it.next(); + final int limit = numThreadStatesActive; + for (int i = 0; i < limit; i++) { + final ThreadState state = threadStates[i]; if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) { minThreadState = state; } } return minThreadState; } + + /** + * Returns the number of currently deactivated {@link ThreadState} instances. + * A deactivated {@link ThreadState} should not be used for indexing anymore. + * + * @return the number of currently deactivated {@link ThreadState} instances. + */ + int numDeactivatedThreadStates() { + int count = 0; + for (int i = 0; i < threadStates.length; i++) { + final ThreadState threadState = threadStates[i]; + threadState.lock(); + try { + if (!threadState.isActive) { + count++; + } + } finally { + threadState.unlock(); + } + } + return count; + } + + /** + * Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can + * not be used for indexing anymore once they are deactivated. This method should only be used + * if the parent {@link DocumentsWriter} is closed or aborted. + * + * @param threadState the state to deactivate + */ + void deactivateThreadState(ThreadState threadState) { + assert threadState.isActive(); + threadState.resetWriter(null); + } + + /** + * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should + * only be reinitialized if it is active without any pending documents. + * + * @param threadState the state to reinitialize + */ + void reinitThreadState(ThreadState threadState) { + assert threadState.isActive; + assert threadState.dwpt.getNumDocsInRAM() == 0; + threadState.dwpt.initialize(); + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java index 4c7ca1f..f7bf325 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java @@ -72,7 +72,7 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy { @Override public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDocCount() - && state.perThread.getNumDocsInRAM() >= indexWriterConfig + && state.dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { // Flush this state by num docs control.setFlushPending(state); diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java index 4f85eae..93ac3f0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java @@ -75,9 +75,7 @@ public abstract class FlushPolicy { */ public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) { onInsert(control, state); - if (!state.flushPending) { - onDelete(control, state); - } + onDelete(control, state); } /** @@ -107,17 +105,17 @@ public abstract class FlushPolicy { */ protected ThreadState findLargestNonPendingWriter( DocumentsWriterFlushControl control, ThreadState perThreadState) { - assert perThreadState.perThread.getNumDocsInRAM() > 0; + assert perThreadState.dwpt.getNumDocsInRAM() > 0; long maxRamSoFar = perThreadState.bytesUsed; // the dwpt which needs to be flushed eventually ThreadState maxRamUsingThreadState = perThreadState; assert !perThreadState.flushPending : "DWPT should have flushed"; - Iterator activePerThreadsIterator = control.allActiveThreads(); + Iterator activePerThreadsIterator = control.allActiveThreadStates(); while (activePerThreadsIterator.hasNext()) { ThreadState next = activePerThreadsIterator.next(); if (!next.flushPending) { final long nextRam = next.bytesUsed; - if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) { + if (nextRam > maxRamSoFar && next.dwpt.getNumDocsInRAM() > 0) { maxRamSoFar = nextRam; maxRamUsingThreadState = next; } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 6b3160c..b916b70 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1144,7 +1144,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized(this) { closed = true; } - assert oldWriter.assertNoActiveDWPT(); + assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates(); } catch (OutOfMemoryError oom) { handleOOM(oom, "closeInternal"); } finally { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java index a4e33f2..6adaebe 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java @@ -94,6 +94,13 @@ public final class IndexWriterConfig implements Cloneable { /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */ public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945; + + /** The maximum number of simultaneous threads that may be + * indexing documents at once in IndexWriter; if more + * than this many threads arrive they will wait for + * others to finish. Default value is 8. */ + public final static int DEFAULT_MAX_THREAD_STATES = 8; + /** * Sets the default (for any instance) maximum time to wait for a write lock * (in milliseconds). @@ -172,7 +179,7 @@ public final class IndexWriterConfig implements Cloneable { } flushPolicy = new FlushByRamOrCountsPolicy(); readerPooling = DEFAULT_READER_POOLING; - indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(); + indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES); readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR; perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; } @@ -554,8 +561,8 @@ public final class IndexWriterConfig implements Cloneable { * IndexWriter to assign thread-states to incoming indexing threads. If no * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of - * thread-states set to {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see - * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}). + * thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see + * {@link #DEFAULT_MAX_THREAD_STATES}). *

*

* NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with diff --git a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java index b5f0b6c..c07ece5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java @@ -34,13 +34,8 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT private Map threadBindings = new ConcurrentHashMap(); /** - * Creates a new {@link DocumentsWriterPerThreadPool} with max. - * {@link #DEFAULT_MAX_THREAD_STATES} thread states. + * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s. */ - public ThreadAffinityDocumentsWriterThreadPool() { - this(DEFAULT_MAX_THREAD_STATES); - } - public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) { super(maxNumPerThreads); assert getMaxThreadStates() >= 1; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java index 5217de3..a85589d 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java @@ -281,10 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { } protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) { - Iterator allActiveThreads = flushControl.allActiveThreads(); + Iterator allActiveThreads = flushControl.allActiveThreadStates(); long bytesUsed = 0; while (allActiveThreads.hasNext()) { - bytesUsed += allActiveThreads.next().perThread.bytesUsed(); + bytesUsed += allActiveThreads.next().dwpt.bytesUsed(); } assertEquals(bytesUsed, flushControl.activeBytes()); } @@ -343,7 +343,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { if (state.flushPending) { toFlush = state; } else if (flushOnDeleteTerms() - && state.perThread.pendingDeletes.numTermDeletes.get() >= indexWriterConfig + && state.dwpt.pendingDeletes.numTermDeletes.get() >= indexWriterConfig .getMaxBufferedDeleteTerms()) { toFlush = state; } else { @@ -376,7 +376,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { if (state.flushPending) { toFlush = state; } else if (flushOnDocCount() - && state.perThread.getNumDocsInRAM() >= indexWriterConfig + && state.dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { toFlush = state; } else if (flushOnRAM() @@ -397,7 +397,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { hasMarkedPending = true; } else { peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush); - peakDocCountWithoutFlush = Math.max(state.perThread.getNumDocsInRAM(), + peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(), peakDocCountWithoutFlush); } @@ -409,7 +409,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { static void findPending(DocumentsWriterFlushControl flushControl, ArrayList pending, ArrayList notPending) { - Iterator allActiveThreads = flushControl.allActiveThreads(); + Iterator allActiveThreads = flushControl.allActiveThreadStates(); while (allActiveThreads.hasNext()) { ThreadState next = allActiveThreads.next(); if (next.flushPending) {