diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index c51be89..b0f3a30 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -20,6 +20,8 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; @@ -109,7 +111,7 @@ final class DocumentsWriter { List newFiles; - final IndexWriter indexWriter; + private final LiveIndexWriterConfig config; final LiveIndexWriterConfig indexWriterConfig; private AtomicInteger numDocsInRAM = new AtomicInteger(0); @@ -134,54 +136,69 @@ final class DocumentsWriter { final DocumentsWriterFlushControl flushControl; final Codec codec; - DocumentsWriter(Codec codec, LiveIndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumbers globalFieldNumbers, - BufferedDeletesStream bufferedDeletesStream) { + DocumentsWriter(Codec codec, IndexWriter writer, LiveIndexWriterConfig config, Directory directory) { this.codec = codec; this.directory = directory; - this.indexWriter = writer; + this.config = config; this.infoStream = config.getInfoStream(); this.similarity = config.getSimilarity(); this.indexWriterConfig = writer.getConfig(); this.perThreadPool = config.getIndexerThreadPool(); this.chain = config.getIndexingChain(); - this.perThreadPool.initialize(this, globalFieldNumbers, config); flushPolicy = config.getFlushPolicy(); assert flushPolicy != null; flushPolicy.init(this); - flushControl = new DocumentsWriterFlushControl(this, config); + flushControl = new DocumentsWriterFlushControl(this, writer); } + + private final AtomicBoolean flagForcePurge = new AtomicBoolean(); + - synchronized void deleteQueries(final Query... queries) throws IOException { + synchronized boolean deleteQueries(final Query... queries) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(queries); flushControl.doOnDelete(); - if (flushControl.doApplyAllDeletes()) { - applyAllDeletes(deleteQueue); - } + return applyAllDeletes(deleteQueue); } // TODO: we could check w/ FreqProxTermsWriter: if the // term doesn't exist, don't bother buffering into the // per-DWPT map (but still must go into the global map) - synchronized void deleteTerms(final Term... terms) throws IOException { + synchronized boolean deleteTerms(final Term... terms) throws IOException { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(terms); flushControl.doOnDelete(); - if (flushControl.doApplyAllDeletes()) { - applyAllDeletes(deleteQueue); - } + return applyAllDeletes( deleteQueue); } DocumentsWriterDeleteQueue currentDeleteSession() { return deleteQueue; } - private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { - if (deleteQueue != null && !flushControl.isFullFlush()) { - ticketQueue.addDeletesAndPurge(this, deleteQueue); + private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + return applyAllDeletes(deleteQueue, false); + } + + private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue, boolean forced) throws IOException { + if (forced || flushControl.doApplyAllDeletes()) { + if (deleteQueue != null && !flushControl.isFullFlush()) { + ticketQueue.addDeletes(deleteQueue); + this.flagForcePurge.compareAndSet(false, true); + } + putEvent(new ApplyDeletesEvent()); + return true; + } + return false; + } + + final void purgeBuffer(IndexWriter writer) throws IOException { + if (flagForcePurge.getAndSet(false)) { + ticketQueue.forcePurge(writer); + } else { + ticketQueue.tryPurge(writer); } - indexWriter.applyAllDeletes(); - indexWriter.flushCount.incrementAndGet(); } + /** Returns how many docs are currently buffered in RAM. */ int getNumDocs() { @@ -202,7 +219,8 @@ final class DocumentsWriter { * updating the index files) and must discard all * currently buffered docs. This resets our state, * discarding any docs added since last flush. */ - synchronized void abort() { + synchronized void abort(IndexWriter indexWriter) { + assert !Thread.holdsLock(indexWriter) : "IW lock should never be hold when aborting"; boolean success = false; try { @@ -216,16 +234,7 @@ final class DocumentsWriter { final ThreadState perThread = perThreadPool.getThreadState(i); perThread.lock(); try { - if (perThread.isActive()) { // we might be closed - try { - perThread.dwpt.abort(); - } finally { - perThread.dwpt.checkAndResetHasAborted(); - flushControl.doOnAbort(perThread); - } - } else { - assert closed; - } + abortThreadState(perThread); } finally { perThread.unlock(); } @@ -240,7 +249,7 @@ final class DocumentsWriter { } } - synchronized void lockAndAbortAll() { + synchronized void lockAndAbortAll(IndexWriter indexWriter) { assert indexWriter.holdsFullFlushLock(); if (infoStream.isEnabled("DW")) { infoStream.message("DW", "lockAndAbortAll"); @@ -252,14 +261,7 @@ final class DocumentsWriter { for (int i = 0; i < limit; i++) { final ThreadState perThread = perThreadPool.getThreadState(i); perThread.lock(); - if (perThread.isActive()) { // we might be closed or - try { - perThread.dwpt.abort(); - } finally { - perThread.dwpt.checkAndResetHasAborted(); - flushControl.doOnAbort(perThread); - } - } + abortThreadState(perThread); } deleteQueue.clear(); flushControl.abortPendingFlushes(); @@ -271,12 +273,31 @@ final class DocumentsWriter { } if (!success) { // if something happens here we unlock all states again - unlockAllAfterAbortAll(); + unlockAllAfterAbortAll(indexWriter); + } + } + } + + private final void abortThreadState(final ThreadState perThread) { + assert perThread.isHeldByCurrentThread(); + if (perThread.isActive()) { // we might be closed + if (perThread.isInitialized()) { + try { + subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM()); + perThread.dwpt.abort(); + } finally { + perThread.dwpt.checkAndResetHasAborted(); + flushControl.doOnAbort(perThread); + } + } else { + flushControl.doOnAbort(perThread); } + } else { + assert closed; } } - final synchronized void unlockAllAfterAbortAll() { + final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) { assert indexWriter.holdsFullFlushLock(); if (infoStream.isEnabled("DW")) { infoStream.message("DW", "unlockAll"); @@ -364,20 +385,18 @@ final class DocumentsWriter { return maybeMerge; } - private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { - if (flushControl.doApplyAllDeletes()) { - applyAllDeletes(deleteQueue); - } + private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean iwCheck) throws IOException { + iwCheck |= applyAllDeletes(deleteQueue); if (flushingDWPT != null) { - maybeMerge |= doFlush(flushingDWPT); + iwCheck |= doFlush(flushingDWPT); } else { final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); if (nextPendingFlush != null) { - maybeMerge |= doFlush(nextPendingFlush); + iwCheck |= doFlush(nextPendingFlush); } } - return maybeMerge; + return iwCheck; } boolean updateDocuments(final Iterable docs, final Analyzer analyzer, @@ -392,13 +411,16 @@ final class DocumentsWriter { ensureOpen(); assert false: "perThread is not active but we are still open"; } - + + assert perThread.isInitialized(); final DocumentsWriterPerThread dwpt = perThread.dwpt; + final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); numDocsInRAM.addAndGet(docCount); } finally { if (dwpt.checkAndResetHasAborted()) { + subtractFlushedNumDocs(dwptNumDocs); flushControl.doOnAbort(perThread); } } @@ -419,20 +441,20 @@ final class DocumentsWriter { final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - try { - if (!perThread.isActive()) { ensureOpen(); throw new IllegalStateException("perThread is not active but we are still open"); } - + assert perThread.isInitialized(); final DocumentsWriterPerThread dwpt = perThread.dwpt; + final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { dwpt.updateDocument(doc, analyzer, delTerm); numDocsInRAM.incrementAndGet(); } finally { if (dwpt.checkAndResetHasAborted()) { + subtractFlushedNumDocs(dwptNumDocs); flushControl.doOnAbort(perThread); } } @@ -474,9 +496,24 @@ final class DocumentsWriter { // Each flush is assigned a ticket in the order they acquire the ticketQueue lock ticket = ticketQueue.addFlushTicket(flushingDWPT); - // flush concurrently without locking - final FlushedSegment newSegment = flushingDWPT.flush(); - ticketQueue.addSegment(ticket, newSegment); + final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM(); + boolean dwptSuccess = false; + try { + // flush concurrently without locking + final FlushedSegment newSegment = flushingDWPT.flush(); + ticketQueue.addSegment(ticket, newSegment); + dwptSuccess = true; + } finally { + subtractFlushedNumDocs(flushingDocsInRam); + if (flushingDWPT.filesToDelete != null) { + putEvent(new DeleteNewFilesEvent(flushingDWPT.filesToDelete)); + maybeMerge = true; + } + if (!dwptSuccess) { + putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo())); + maybeMerge = true; + } + } // flush was successful once we reached this point - new seg. has been assigned to the ticket! success = true; } finally { @@ -496,54 +533,38 @@ final class DocumentsWriter { // thread in innerPurge can't keep up with all // other threads flushing segments. In this case // we forcefully stall the producers. - ticketQueue.forcePurge(this); - } else { - ticketQueue.tryPurge(this); + flagForcePurge.compareAndSet(false, true); } - } finally { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); - indexWriter.flushCount.incrementAndGet(); - indexWriter.doAfterFlush(); } flushingDWPT = flushControl.nextPendingFlush(); } + if (maybeMerge) { + putEvent(new MergePendingEvent()); + } // If deletes alone are consuming > 1/2 our RAM // buffer, force them all to apply now. This is to // prevent too-frequent flushing of a long tail of // tiny segments: - final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); + final double ramBufferSizeMB = config.getRAMBufferSizeMB(); if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB)); } - applyAllDeletes(deleteQueue); + if (this.applyAllDeletes(deleteQueue)) { + this.flagForcePurge.compareAndSet(false, true); + maybeMerge |= true; + } } return maybeMerge; } - - void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) - throws IOException { - // Finish the flushed segment and publish it to IndexWriter - if (newSegment == null) { - assert bufferedDeletes != null; - if (bufferedDeletes != null && bufferedDeletes.any()) { - indexWriter.publishFrozenDeletes(bufferedDeletes); - if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes); - } - } - } else { - publishFlushedSegment(newSegment, bufferedDeletes); - } - } - final void subtractFlushedNumDocs(int numFlushed) { int oldValue = numDocsInRAM.get(); while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) { @@ -551,29 +572,6 @@ final class DocumentsWriter { } } - /** - * Publishes the flushed segment, segment private deletes (if any) and its - * associated global delete (if present) to IndexWriter. The actual - * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s - * delete generation is always GlobalPacket_deleteGeneration + 1 - */ - private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket) - throws IOException { - assert newSegment != null; - assert newSegment.segmentInfo != null; - final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes; - //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name); - if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes); - } - - if (segmentDeletes != null && infoStream.isEnabled("DW")) { - infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes); - } - // now publish! - indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket); - } - // for asserts private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null; @@ -588,7 +586,7 @@ final class DocumentsWriter { * two stage operation; the caller must ensure (in try/finally) that finishFlush * is called after this method, to release the flush lock in DWFlushControl */ - final boolean flushAllThreads() + final boolean flushAllThreads(final IndexWriter indexWriter) throws IOException { final DocumentsWriterDeleteQueue flushingDeleteQueue; if (infoStream.isEnabled("DW")) { @@ -620,10 +618,9 @@ final class DocumentsWriter { if (infoStream.isEnabled("DW")) { infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); } - ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue); - } else { - ticketQueue.forcePurge(this); - } + ticketQueue.addDeletes(flushingDeleteQueue); + } + ticketQueue.forcePurge(indexWriter); assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets(); } finally { assert flushingDeleteQueue == currentFullFlushDelQueue; @@ -648,4 +645,64 @@ final class DocumentsWriter { } } + + public LiveIndexWriterConfig getIndexWriterConfig() { + return config; + } + + private final ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); + + public Event pollNextEvent() { + return events.poll(); + } + + public void putEvent(Event event) { + events.add(event); + } + + public static interface Event { + public void process(IndexWriter writer, boolean triggerMerge) throws IOException; + } + + private static final class ApplyDeletesEvent implements Event { + + @Override + public void process(IndexWriter writer, boolean triggerMerge) throws IOException { + writer.applyDeletesAndPurge(); + } + } + + private static final class MergePendingEvent implements Event { + + @Override + public void process(IndexWriter writer, boolean triggerMerge) throws IOException { + writer.doAfterSegmentFlushed(triggerMerge); + } + } + + private static class FlushFailedEvent implements DocumentsWriter.Event { + private final SegmentInfo info; + + public FlushFailedEvent(SegmentInfo info) { + this.info = info; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge) throws IOException { + writer.flushFailed(info); + } + } + + private static class DeleteNewFilesEvent implements DocumentsWriter.Event { + private final Collection files; + + public DeleteNewFilesEvent(Collection files) { + this.files = files; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge) throws IOException { + writer.deleteNewFiles(files); + } + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index cef4410..d7dd629 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -66,14 +66,18 @@ final class DocumentsWriterFlushControl { private boolean closed = false; private final DocumentsWriter documentsWriter; private final LiveIndexWriterConfig config; + private final BufferedDeletesStream bufferedDeletesStream; + private final IndexWriter writer; - DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) { + DocumentsWriterFlushControl(DocumentsWriter documentsWriter, IndexWriter writer) { this.stallControl = new DocumentsWriterStallControl(); this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; + this.config = writer.getConfig(); this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; - this.config = config; this.documentsWriter = documentsWriter; + this.bufferedDeletesStream = writer.bufferedDeletesStream; + this.writer = writer; } public synchronized long activeBytes() { @@ -240,7 +244,6 @@ final class DocumentsWriterFlushControl { } public synchronized void waitForFlush() { - assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush"; while (flushingWriters.size() != 0) { try { this.wait(); @@ -311,7 +314,7 @@ final class DocumentsWriterFlushControl { // We are pending so all memory is already moved to flushBytes if (perThread.tryLock()) { try { - if (perThread.isActive()) { + if (perThread.isInitialized()) { assert perThread.isHeldByCurrentThread(); final DocumentsWriterPerThread dwpt; final long bytes = perThread.bytesUsed; // do that before @@ -413,11 +416,11 @@ final class DocumentsWriterFlushControl { * Returns the number of delete terms in the global pool */ public int getNumGlobalTermDeletes() { - return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms(); + return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedDeletesStream.numTerms(); } public long getDeleteBytesUsed() { - return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed(); + return documentsWriter.deleteQueue.bytesUsed() + bufferedDeletesStream.bytesUsed(); } synchronized int numFlushingDWPT() { @@ -441,13 +444,16 @@ final class DocumentsWriterFlushControl { .currentThread(), documentsWriter); boolean success = false; try { - if (perThread.isActive() + if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: addFlushableState(perThread); } + if (!closed && perThread.isActive() && !perThread.isInitialized()) { + initializeThreadState(perThread); + } success = true; // simply return the ThreadState even in a flush all case sine we already hold the lock return perThread; @@ -458,6 +464,22 @@ final class DocumentsWriterFlushControl { } } + ThreadState initializeThreadState(ThreadState state) { + if (state.isActive() && state.dwpt == null) { + state.dwpt = newDWPT(); + } + return state; + } + + private DocumentsWriterPerThread newDWPT() { + final FieldInfos.Builder infos = new FieldInfos.Builder(writer.globalFieldNumberMap); + return new DocumentsWriterPerThread( + writer.newSegmentName(), documentsWriter.directory, + documentsWriter.indexWriterConfig, documentsWriter.infoStream, + documentsWriter.codec, documentsWriter.deleteQueue, infos, + documentsWriter.chain); + } + void markForFullFlush() { final DocumentsWriterDeleteQueue flushingQueue; synchronized (this) { @@ -475,7 +497,10 @@ final class DocumentsWriterFlushControl { final ThreadState next = perThreadPool.getThreadState(i); next.lock(); try { - if (!next.isActive()) { + if (!next.isInitialized()) { + if (closed && next.isActive()) { + perThreadPool.deactivateThreadState(next); + } continue; } assert next.dwpt.deleteQueue == flushingQueue @@ -515,7 +540,7 @@ final class DocumentsWriterFlushControl { final ThreadState next = perThreadPool.getThreadState(i); next.lock(); try { - assert !next.isActive() || next.dwpt.deleteQueue == queue; + assert !next.isInitialized() || next.dwpt.deleteQueue == queue; } finally { next.unlock(); } @@ -531,7 +556,7 @@ final class DocumentsWriterFlushControl { } final DocumentsWriterPerThread dwpt = perThread.dwpt; assert perThread.isHeldByCurrentThread(); - assert perThread.isActive(); + assert perThread.isInitialized(); assert fullFlush; assert dwpt.deleteQueue != documentsWriter.deleteQueue; if (dwpt.getNumDocsInRAM() > 0) { @@ -547,8 +572,6 @@ final class DocumentsWriterFlushControl { } else { if (closed) { perThreadPool.deactivateThreadState(perThread); // make this state inactive - } else { - perThreadPool.reinitThreadState(perThread); } } } @@ -606,6 +629,7 @@ final class DocumentsWriterFlushControl { try { for (DocumentsWriterPerThread dwpt : flushQueue) { try { + documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM()); dwpt.abort(); } catch (Throwable ex) { // ignore - keep on aborting the flush queue @@ -617,6 +641,7 @@ final class DocumentsWriterFlushControl { try { flushingWriters .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes)); + documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM()); blockedFlush.dwpt.abort(); } catch (Throwable ex) { // ignore - keep on aborting the blocked queue diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java index 24c263c..3fee6ca 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java @@ -34,8 +34,7 @@ class DocumentsWriterFlushQueue { private final AtomicInteger ticketCount = new AtomicInteger(); private final ReentrantLock purgeLock = new ReentrantLock(); - void addDeletesAndPurge(DocumentsWriter writer, - DocumentsWriterDeleteQueue deleteQueue) throws IOException { + void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { synchronized (this) { incTickets();// first inc the ticket count - freeze opens // a window for #anyChanges to fail @@ -49,9 +48,6 @@ class DocumentsWriterFlushQueue { } } } - // don't hold the lock on the FlushQueue when forcing the purge - this blocks and deadlocks - // if we hold the lock. - forcePurge(writer); } private void incTickets() { @@ -98,7 +94,7 @@ class DocumentsWriterFlushQueue { return ticketCount.get() != 0; } - private void innerPurge(DocumentsWriter writer) throws IOException { + private void innerPurge(IndexWriter writer) throws IOException { assert purgeLock.isHeldByCurrentThread(); while (true) { final FlushTicket head; @@ -130,8 +126,9 @@ class DocumentsWriterFlushQueue { } } - void forcePurge(DocumentsWriter writer) throws IOException { + void forcePurge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(this); + assert !Thread.holdsLock(writer); purgeLock.lock(); try { innerPurge(writer); @@ -140,8 +137,9 @@ class DocumentsWriterFlushQueue { } } - void tryPurge(DocumentsWriter writer) throws IOException { + void tryPurge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(this); + assert !Thread.holdsLock(writer); if (purgeLock.tryLock()) { try { innerPurge(writer); @@ -169,8 +167,47 @@ class DocumentsWriterFlushQueue { this.frozenDeletes = frozenDeletes; } - protected abstract void publish(DocumentsWriter writer) throws IOException; + protected abstract void publish(IndexWriter writer) throws IOException; protected abstract boolean canPublish(); + + /** + * Publishes the flushed segment, segment private deletes (if any) and its + * associated global delete (if present) to IndexWriter. The actual + * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s + * delete generation is always GlobalPacket_deleteGeneration + 1 + */ + protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes globalPacket) + throws IOException { + assert newSegment != null; + assert newSegment.segmentInfo != null; + final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes; + //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name); + if (indexWriter.infoStream.isEnabled("DW")) { + indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes); + } + + if (segmentDeletes != null && indexWriter.infoStream.isEnabled("DW")) { + indexWriter.infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes); + } + // now publish! + indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket); + } + + protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) + throws IOException { + // Finish the flushed segment and publish it to IndexWriter + if (newSegment == null) { + assert bufferedDeletes != null; + if (bufferedDeletes != null && bufferedDeletes.any()) { + indexWriter.publishFrozenDeletes(bufferedDeletes); + if (indexWriter.infoStream.isEnabled("DW")) { + indexWriter.infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes); + } + } + } else { + publishFlushedSegment(indexWriter, newSegment, bufferedDeletes); + } + } } static final class GlobalDeletesTicket extends FlushTicket { @@ -179,11 +216,11 @@ class DocumentsWriterFlushQueue { super(frozenDeletes); } @Override - protected void publish(DocumentsWriter writer) throws IOException { + protected void publish(IndexWriter writer) throws IOException { assert !published : "ticket was already publised - can not publish twice"; published = true; // its a global ticket - no segment to publish - writer.finishFlush(null, frozenDeletes); + finishFlush(writer, null, frozenDeletes); } @Override @@ -201,10 +238,10 @@ class DocumentsWriterFlushQueue { } @Override - protected void publish(DocumentsWriter writer) throws IOException { + protected void publish(IndexWriter writer) throws IOException { assert !published : "ticket was already publised - can not publish twice"; published = true; - writer.finishFlush(segment, frozenDeletes); + finishFlush(writer, segment, frozenDeletes); } protected void setSegment(FlushedSegment segment) { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index cdc5088..250c722 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -159,7 +159,7 @@ class DocumentsWriterPerThread { pendingDeletes.clear(); deleteSlice = deleteQueue.newSlice(); // Reset all postings data - doAfterFlush(); + doAfterFlushOrAbort(); } finally { aborting = false; @@ -169,9 +169,7 @@ class DocumentsWriterPerThread { } } private final static boolean INFO_VERBOSE = false; - final DocumentsWriter parent; final Codec codec; - final IndexWriter writer; final TrackingDirectoryWrapper directory; final Directory directoryOrig; final DocState docState; @@ -180,8 +178,8 @@ class DocumentsWriterPerThread { SegmentWriteState flushState; //Deletes for our still-in-RAM (to be flushed next) segment - BufferedDeletes pendingDeletes; - SegmentInfo segmentInfo; // Current segment we are working on + final BufferedDeletes pendingDeletes; + private final SegmentInfo segmentInfo; // Current segment we are working on boolean aborting = false; // True if an abort is pending boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting @@ -189,7 +187,7 @@ class DocumentsWriterPerThread { private final InfoStream infoStream; private int numDocsInRAM; private int flushedDocCount; - DocumentsWriterDeleteQueue deleteQueue; + final DocumentsWriterDeleteQueue deleteQueue; DeleteSlice deleteSlice; private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT); final Allocator byteBlockAllocator; @@ -197,55 +195,53 @@ class DocumentsWriterPerThread { private final LiveIndexWriterConfig indexWriterConfig; - public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, + public DocumentsWriterPerThread(String segmentName, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, Codec codec, DocumentsWriterDeleteQueue deleteQueue, FieldInfos.Builder fieldInfos, IndexingChain indexingChain) { this.directoryOrig = directory; this.directory = new TrackingDirectoryWrapper(directory); - this.parent = parent; this.fieldInfos = fieldInfos; - this.writer = parent.indexWriter; - this.indexWriterConfig = parent.indexWriterConfig; - this.infoStream = parent.infoStream; - this.codec = parent.codec; + this.indexWriterConfig = indexWriterConfig; + this.infoStream = infoStream; + this.codec = codec; this.docState = new DocState(this, infoStream); - this.docState.similarity = parent.indexWriter.getConfig().getSimilarity(); + this.docState.similarity = indexWriterConfig.getSimilarity(); bytesUsed = Counter.newCounter(); byteBlockAllocator = new DirectTrackingAllocator(bytesUsed); pendingDeletes = new BufferedDeletes(); intBlockAllocator = new IntBlockAllocator(bytesUsed); - initialize(); + this.deleteQueue = deleteQueue; + assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; + pendingDeletes.clear(); + deleteSlice = null; + + segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1, + false, codec, null, null); + assert numDocsInRAM == 0; + if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { + infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue); + } // this should be the last call in the ctor // it really sucks that we need to pull this within the ctor and pass this ref to the chain! consumer = indexingChain.getChain(this); + } - public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) { - this(other.directoryOrig, other.parent, fieldInfos, other.parent.chain); - } - - void initialize() { - deleteQueue = parent.deleteQueue; - assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; - pendingDeletes.clear(); - deleteSlice = null; - } - void setAborting() { aborting = true; } + + boolean checkAndResetHasAborted() { + final boolean retval = hasAborted; + hasAborted = false; + return retval; + } final boolean testPoint(String message) { if (infoStream.isEnabled("TP")) { - infoStream.message("TP", message); + infoStream.message("TP", message); } return true; } - - boolean checkAndResetHasAborted() { - final boolean retval = hasAborted; - hasAborted = false; - return retval; - } public void updateDocument(IndexDocument doc, Analyzer analyzer, Term delTerm) throws IOException { assert testPoint("DocumentsWriterPerThread addDocument start"); @@ -253,9 +249,6 @@ class DocumentsWriterPerThread { docState.doc = doc; docState.analyzer = analyzer; docState.docID = numDocsInRAM; - if (segmentInfo == null) { - initSegmentInfo(); - } if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); } @@ -290,23 +283,10 @@ class DocumentsWriterPerThread { finishDocument(delTerm); } - private void initSegmentInfo() { - String segment = writer.newSegmentName(); - segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segment, -1, - false, codec, null, null); - assert numDocsInRAM == 0; - if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { - infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue); - } - } - public int updateDocuments(Iterable docs, Analyzer analyzer, Term delTerm) throws IOException { assert testPoint("DocumentsWriterPerThread addDocuments start"); assert deleteQueue != null; docState.analyzer = analyzer; - if (segmentInfo == null) { - initSegmentInfo(); - } if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); } @@ -435,11 +415,9 @@ class DocumentsWriterPerThread { } /** Reset after a flush */ - private void doAfterFlush() { - segmentInfo = null; + private void doAfterFlushOrAbort() { directory.getCreatedFiles().clear(); fieldInfos = new FieldInfos.Builder(fieldInfos.globalFieldNumbers); - parent.subtractFlushedNumDocs(numDocsInRAM); numDocsInRAM = 0; } @@ -468,9 +446,9 @@ class DocumentsWriterPerThread { assert deleteSlice == null : "all deletes must be applied in prepareFlush"; segmentInfo.setDocCount(numDocsInRAM); flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), - writer.getConfig().getTermIndexInterval(), + indexWriterConfig.getTermIndexInterval(), pendingDeletes, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed()))); - final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.; + final double startMBUsed = bytesUsed() / 1024. / 1024.; // Apply delete-by-docID now (delete-byDocID only // happens when an exception is hit processing that @@ -520,11 +498,9 @@ class DocumentsWriterPerThread { final BufferedDeletes segmentDeletes; if (pendingDeletes.queries.isEmpty()) { - pendingDeletes.clear(); segmentDeletes = null; } else { segmentDeletes = pendingDeletes; - pendingDeletes = new BufferedDeletes(); } if (infoStream.isEnabled("DWPT")) { @@ -540,20 +516,18 @@ class DocumentsWriterPerThread { FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush); sealFlushedSegment(fs); - doAfterFlush(); + doAfterFlushOrAbort(); success = true; return fs; } finally { if (!success) { - if (segmentInfo != null) { - writer.flushFailed(segmentInfo); - } abort(); } } } + Collection filesToDelete; // HACK! nocommit /** * Seals the {@link SegmentInfo} for the new flushed segment and persists * the deleted documents {@link MutableBits}. @@ -569,12 +543,10 @@ class DocumentsWriterPerThread { boolean success = false; try { + if (indexWriterConfig.getUseCompoundFile()) { - - // Now build compound file - Collection oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context); + filesToDelete = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context); newSegment.info.setUseCompoundFile(true); - writer.deleteNewFiles(oldFiles); } // Have codec write SegmentInfo. Must do this after @@ -619,7 +591,6 @@ class DocumentsWriterPerThread { infoStream.message("DWPT", "hit exception " + "reating compound file for newly flushed segment " + newSegment.info.name); } - writer.flushFailed(newSegment.info); } } } @@ -672,4 +643,5 @@ class DocumentsWriterPerThread { + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborting=" + aborting + ", numDocsInRAM=" + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]"; } + } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 649a81c..55d0bee 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -71,12 +71,16 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { * for indexing anymore. * @see #isActive() */ - private void resetWriter(DocumentsWriterPerThread dwpt) { + + private void deactivate() { assert this.isHeldByCurrentThread(); - if (dwpt == null) { - isActive = false; - } - this.dwpt = dwpt; + isActive = false; + reset(); + } + + private void reset() { + assert this.isHeldByCurrentThread(); + this.dwpt = null; this.bytesUsed = 0; this.flushPending = false; } @@ -91,6 +95,11 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { return isActive; } + boolean isInitialized() { + assert this.isHeldByCurrentThread(); + return isActive() && dwpt != null; + } + /** * Returns the number of currently active bytes in this ThreadState's * {@link DocumentsWriterPerThread} @@ -121,9 +130,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { private ThreadState[] threadStates; private volatile int numThreadStatesActive; - private SetOnce globalFieldMap = new SetOnce(); - private SetOnce documentsWriter = new SetOnce(); - + /** * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s. */ @@ -133,14 +140,8 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { } threadStates = new ThreadState[maxNumThreadStates]; numThreadStatesActive = 0; - } - - void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) { - this.documentsWriter.set(documentsWriter); // thread pool is bound to DW - this.globalFieldMap.set(globalFieldMap); for (int i = 0; i < threadStates.length; i++) { - final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap); - threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); + threadStates[i] = new ThreadState(null); } } @@ -155,9 +156,10 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { // should not happen throw new RuntimeException(e); } - clone.documentsWriter = new SetOnce(); - clone.globalFieldMap = new SetOnce(); clone.threadStates = new ThreadState[threadStates.length]; + for (int i = 0; i < threadStates.length; i++) { + clone.threadStates[i] = new ThreadState(null); + } return clone; } @@ -175,6 +177,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { int getActiveThreadState() { return numThreadStatesActive; } + /** * Returns a new {@link ThreadState} iff any new state is available otherwise @@ -195,8 +198,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { if (threadState.isActive()) { // unreleased thread states are deactivated during DW#close() numThreadStatesActive++; // increment will publish the ThreadState - assert threadState.dwpt != null; - threadState.dwpt.initialize(); + assert threadState.dwpt == null; unlock = false; return threadState; } @@ -217,7 +219,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { for (int i = numThreadStatesActive; i < threadStates.length; i++) { assert threadStates[i].tryLock() : "unreleased threadstate should not be locked"; try { - assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive"; + assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive"; } finally { threadStates[i].unlock(); } @@ -233,7 +235,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { final ThreadState threadState = threadStates[i]; threadState.lock(); try { - threadState.resetWriter(null); + threadState.deactivate(); } finally { threadState.unlock(); } @@ -242,15 +244,11 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) { assert threadState.isHeldByCurrentThread(); - assert globalFieldMap.get() != null; final DocumentsWriterPerThread dwpt = threadState.dwpt; if (!closed) { - final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get()); - final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos); - newDwpt.initialize(); - threadState.resetWriter(newDwpt); + threadState.reset(); } else { - threadState.resetWriter(null); + threadState.deactivate(); } return dwpt; } @@ -325,18 +323,6 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { */ void deactivateThreadState(ThreadState threadState) { assert threadState.isActive(); - threadState.resetWriter(null); - } - - /** - * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should - * only be reinitialized if it is active without any pending documents. - * - * @param threadState the state to reinitialize - */ - void reinitThreadState(ThreadState threadState) { - assert threadState.isActive; - assert threadState.dwpt.getNumDocsInRAM() == 0; - threadState.dwpt.initialize(); + threadState.deactivate(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java index 9645479..b07debe 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java @@ -94,7 +94,7 @@ abstract class FlushPolicy implements Cloneable { */ protected synchronized void init(DocumentsWriter docsWriter) { writer.set(docsWriter); - indexWriterConfig = docsWriter.indexWriter.getConfig(); + indexWriterConfig = docsWriter.getIndexWriterConfig(); } /** diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 0a8ccd2..8030090 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.DocumentsWriter.Event; import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.MergePolicy.MergeTrigger; @@ -182,7 +183,7 @@ import org.apache.lucene.util.ThreadInterruptedException; * referenced by the "front" of the index). For this, IndexFileDeleter * keeps track of the last non commit checkpoint. */ -public class IndexWriter implements Closeable, TwoPhaseCommit { +public class IndexWriter implements Closeable, TwoPhaseCommit{ private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1; @@ -360,7 +361,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized (fullFlushLock) { boolean success = false; try { - anySegmentFlushed = docWriter.flushAllThreads(); + anySegmentFlushed = docWriter.flushAllThreads(this); if (!anySegmentFlushed) { // prevent double increment since docWriter#doFlush increments the flushcount // if we flushed anything. @@ -726,7 +727,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // start with previous field numbers, but new FieldInfos globalFieldNumberMap = getFieldNumberMap(); - docWriter = new DocumentsWriter(codec, config, directory, this, globalFieldNumberMap, bufferedDeletesStream); + docWriter = new DocumentsWriter(codec, this, config, directory); // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: @@ -957,7 +958,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { if (doFlush) { flush(waitForMerges, true); } else { - docWriter.abort(); // already closed -- never sync on IW + docWriter.abort(this); // already closed -- never sync on IW } } finally { @@ -1029,7 +1030,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized(this) { closed = true; } - assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates(); + assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates() : "" + oldWriter.perThreadPool.numDeactivatedThreadStates() + " " + oldWriter.perThreadPool.getMaxThreadStates(); } catch (OutOfMemoryError oom) { handleOOM(oom, "closeInternal"); } finally { @@ -1276,9 +1277,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { ensureOpen(); try { boolean success = false; - boolean anySegmentFlushed = false; try { - anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); + if (docWriter.updateDocuments(docs, analyzer, delTerm)) { + processEvents(true); + } success = true; } finally { if (!success) { @@ -1287,9 +1289,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } - if (anySegmentFlushed) { - maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); - } } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocuments"); } @@ -1309,7 +1308,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Term term) throws IOException { ensureOpen(); try { - docWriter.deleteTerms(term); + if (docWriter.deleteTerms(term)) { + processEvents(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term)"); } @@ -1408,7 +1409,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Term... terms) throws IOException { ensureOpen(); try { - docWriter.deleteTerms(terms); + if (docWriter.deleteTerms(terms)) { + processEvents(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term..)"); } @@ -1428,7 +1431,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Query query) throws IOException { ensureOpen(); try { - docWriter.deleteQueries(query); + if (docWriter.deleteQueries(query)) { + processEvents(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query)"); } @@ -1450,7 +1455,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Query... queries) throws IOException { ensureOpen(); try { - docWriter.deleteQueries(queries); + if (docWriter.deleteQueries(queries)) { + processEvents(true); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query..)"); } @@ -1501,9 +1508,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { ensureOpen(); try { boolean success = false; - boolean anySegmentFlushed = false; try { - anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term); + if (docWriter.updateDocument(doc, analyzer, term)) { + processEvents(true); + } success = true; } finally { if (!success) { @@ -1512,10 +1520,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } - - if (anySegmentFlushed) { - maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); - } } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocument"); } @@ -1726,7 +1730,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // complete ensureOpen(); } - // NOTE: in the ConcurrentMergeScheduler case, when // doWait is false, we can return immediately while // background threads accomplish the merging @@ -2005,8 +2008,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { mergeScheduler.close(); bufferedDeletesStream.clear(); + processEvents(false); docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes - docWriter.abort(); // don't sync on IW here + docWriter.abort(this); // don't sync on IW here synchronized(this) { if (pendingCommit != null) { @@ -2098,7 +2102,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { * sure it's just like a fresh index. */ try { - docWriter.lockAndAbortAll(); + docWriter.lockAndAbortAll(this); + processEvents(false); synchronized (this) { try { // Abort any running merges @@ -2131,7 +2136,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } finally { - docWriter.unlockAllAfterAbortAll(); + docWriter.unlockAllAfterAbortAll(this); } } } @@ -2239,33 +2244,40 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { * Atomically adds the segment private delete packet and publishes the flushed * segments SegmentInfo to the index writer. */ - synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment, + void publishFlushedSegment(SegmentInfoPerCommit newSegment, FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException { - // Lock order IW -> BDS - synchronized (bufferedDeletesStream) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "publishFlushedSegment"); - } - - if (globalPacket != null && globalPacket.any()) { - bufferedDeletesStream.push(globalPacket); - } - // Publishing the segment must be synched on IW -> BDS to make the sure - // that no merge prunes away the seg. private delete packet - final long nextGen; - if (packet != null && packet.any()) { - nextGen = bufferedDeletesStream.push(packet); - } else { - // Since we don't have a delete packet to apply we can get a new - // generation right away - nextGen = bufferedDeletesStream.getNextGen(); - } - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment)); + try { + synchronized (this) { + // Lock order IW -> BDS + synchronized (bufferedDeletesStream) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "publishFlushedSegment"); + } + + if (globalPacket != null && globalPacket.any()) { + bufferedDeletesStream.push(globalPacket); + } + // Publishing the segment must be synched on IW -> BDS to make the sure + // that no merge prunes away the seg. private delete packet + final long nextGen; + if (packet != null && packet.any()) { + nextGen = bufferedDeletesStream.push(packet); + } else { + // Since we don't have a delete packet to apply we can get a new + // generation right away + nextGen = bufferedDeletesStream.getNextGen(); + } + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment)); + } + newSegment.setBufferedDeletesGen(nextGen); + segmentInfos.add(newSegment); + checkpoint(); + } } - newSegment.setBufferedDeletesGen(nextGen); - segmentInfos.add(newSegment); - checkpoint(); + } finally { + flushCount.incrementAndGet(); + doAfterFlush(); } } @@ -2695,12 +2707,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { boolean flushSuccess = false; boolean success = false; try { - anySegmentsFlushed = docWriter.flushAllThreads(); + anySegmentsFlushed = docWriter.flushAllThreads(this); if (!anySegmentsFlushed) { // prevent double increment since docWriter#doFlush increments the flushcount // if we flushed anything. flushCount.incrementAndGet(); } + processEvents(false); flushSuccess = true; synchronized(this) { @@ -2740,7 +2753,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } catch (OutOfMemoryError oom) { handleOOM(oom, "prepareCommit"); } - + boolean success = false; try { if (anySegmentsFlushed) { @@ -2755,7 +2768,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } - + startCommit(toCommit); } } @@ -2914,6 +2927,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { if (doFlush(applyAllDeletes) && triggerMerge) { maybeMerge(MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); } + processEvents(triggerMerge); } private boolean doFlush(boolean applyAllDeletes) throws IOException { @@ -2935,10 +2949,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized (fullFlushLock) { boolean flushSuccess = false; try { - anySegmentFlushed = docWriter.flushAllThreads(); + anySegmentFlushed = docWriter.flushAllThreads(this); flushSuccess = true; } finally { docWriter.finishFullFlush(flushSuccess); + processEvents(false); } } synchronized(this) { @@ -4283,4 +4298,33 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized final void flushFailed(SegmentInfo info) throws IOException { deleter.refresh(info.name); } + + final void applyDeletesAndPurge() throws IOException { + try { + docWriter.purgeBuffer(this); + } finally { + applyAllDeletes(); + flushCount.incrementAndGet(); + } + } + final void doAfterSegmentFlushed(boolean triggerMerge) throws IOException { + try { + docWriter.purgeBuffer(this); + } finally { + if (triggerMerge) { + maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); + } + } + + } + + private boolean processEvents(boolean triggerMerge) throws IOException { + Event event; + boolean processed = false; + while((event = docWriter.pollNextEvent()) != null) { + processed = true; + event.process(this, triggerMerge); + } + return processed; + } } diff --git a/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java b/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java index ed28c70..dbc2158 100644 --- a/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java +++ b/lucene/core/src/java/org/apache/lucene/util/BytesRefHash.java @@ -59,6 +59,7 @@ public final class BytesRefHash { private int count; private int lastCount = -1; private int[] ids; + private int[] hashes; private final BytesStartArray bytesStartArray; private Counter bytesUsed; @@ -88,6 +89,7 @@ public final class BytesRefHash { this.pool = pool; ids = new int[hashSize]; Arrays.fill(ids, -1); + hashes = new int[hashSize]; this.bytesStartArray = bytesStartArray; bytesStart = bytesStartArray.init(); bytesUsed = bytesStartArray.bytesUsed() == null? Counter.newCounter() : bytesStartArray.bytesUsed(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java index a79bac6..7a49d3b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java @@ -281,7 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { Iterator allActiveThreads = flushControl.allActiveThreadStates(); long bytesUsed = 0; while (allActiveThreads.hasNext()) { - bytesUsed += allActiveThreads.next().dwpt.bytesUsed(); + ThreadState next = allActiveThreads.next(); + if (next.dwpt != null) { + bytesUsed += next.dwpt.bytesUsed(); + } } assertEquals(bytesUsed, flushControl.activeBytes()); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index 95e9fef..b670a33 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -1744,7 +1744,6 @@ public class TestIndexWriter extends LuceneTestCase { w.deleteAll(); w.commit(); - // Make sure we accumulate no files except for empty // segments_N and segments.gen: assertTrue(d.listAll().length <= 2); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java index e3042cb..bf6917f 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java @@ -83,7 +83,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase { IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random())) .setMaxBufferedDocs(2).setMergePolicy(ldmp).setMergeScheduler(new ConcurrentMergeScheduler())); - + for(int iter=0;iter<10;iter++) { for(int i=0;i<19;i++) writer.addDocument(doc); @@ -96,7 +96,6 @@ public class TestIndexWriterForceMerge extends LuceneTestCase { sis.read(dir); final int segCount = sis.size(); - writer.forceMerge(7); writer.commit(); writer.waitForMerges(); @@ -108,7 +107,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase { if (segCount < 7) assertEquals(segCount, optSegCount); else - assertEquals(7, optSegCount); + assertEquals("seg: " + segCount, 7, optSegCount); } writer.close(); dir.close();