Index: lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java =================================================================== --- lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (working copy) @@ -34,7 +34,7 @@ import org.apache.lucene.search.Weight; /* Tracks the stream of {@link BuffereDeletes}. - * When DocumensWriter flushes, its buffered + * When DocumensWriterPerThread flushes, its buffered * deletes are appended to this stream. We later * apply these deletes (resolve them to the actual * docIDs, per segment) when a merge is started @@ -82,17 +82,27 @@ // Appends a new packet of buffered deletes to the stream, // setting its generation: - public synchronized void push(FrozenBufferedDeletes packet) { + public synchronized long push(FrozenBufferedDeletes packet) { + /* + * The insert operation must be atomic. If we let threads increment the gen + * and push the packet afterwards we risk that packets are out of order. + * With DWPT this is possible if two or more flushes are raceing for pushing + * updates. If the pushed packets get our of order would loose documents + * since deletes are applied to the wrong segments. + */ + packet.setDelGen(nextGen++); assert packet.any(); assert checkDeleteStats(); - assert packet.gen < nextGen; + assert packet.delGen() < nextGen; + assert deletes.isEmpty() || deletes.get(deletes.size()-1).delGen() < packet.delGen() : "Delete packets must be in order"; deletes.add(packet); numTerms.addAndGet(packet.numTermDeletes); bytesUsed.addAndGet(packet.bytesUsed); if (infoStream != null) { - message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size()); + message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size()); } assert checkDeleteStats(); + return packet.delGen(); } public synchronized void clear() { @@ -132,7 +142,7 @@ } // Sorts SegmentInfos from smallest to biggest bufferedDelGen: - private static final Comparator sortByDelGen = new Comparator() { + private static final Comparator sortSegInfoByDelGen = new Comparator() { // @Override -- not until Java 1.6 public int compare(SegmentInfo si1, SegmentInfo si2) { final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen(); @@ -147,10 +157,10 @@ @Override public boolean equals(Object other) { - return sortByDelGen == other; + return sortSegInfoByDelGen == other; } }; - + /** Resolves the buffered deleted Term/Query/docIDs, into * actual deleted docIDs in the deletedDocs BitVector for * each SegmentReader. */ @@ -174,7 +184,7 @@ SegmentInfos infos2 = new SegmentInfos(); infos2.addAll(infos); - Collections.sort(infos2, sortByDelGen); + Collections.sort(infos2, sortSegInfoByDelGen); BufferedDeletes coalescedDeletes = null; boolean anyNewDeletes = false; @@ -191,19 +201,30 @@ final SegmentInfo info = infos2.get(infosIDX); final long segGen = info.getBufferedDeletesGen(); - if (packet != null && segGen < packet.gen) { + if (packet != null && segGen < packet.delGen()) { //System.out.println(" coalesce"); if (coalescedDeletes == null) { coalescedDeletes = new BufferedDeletes(true); } - coalescedDeletes.update(packet); + if (!packet.isSegmentPrivate) { + /* + * only update the coalescededDeletes if we are NOT on a segment private del packet. + * the segment private del packet must only applied to segments with the same delGen. + * Yet, if a segment is already deleted from the SI since it had no more documents remaining + * after some del packets younger than it segPrivate packet (hihger delGen) have been applied + * the segPrivate packet has not been removed. + */ + coalescedDeletes.update(packet); + } + delIDX--; - } else if (packet != null && segGen == packet.gen) { + } else if (packet != null && segGen == packet.delGen()) { + assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet"; //System.out.println(" eq"); // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); - SegmentReader reader = readerPool.get(info, false); + final SegmentReader reader = readerPool.get(info, false); int delCount = 0; final boolean segAllDeletes; try { @@ -213,7 +234,7 @@ delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader); } //System.out.println(" del exact"); - // Don't delete by Term here; DocumentsWriter + // Don't delete by Term here; DocumentsWriterPerThread // already did that on flush: delCount += applyQueryDeletes(packet.queriesIterable(), reader); segAllDeletes = reader.numDocs() == 0; @@ -236,7 +257,12 @@ if (coalescedDeletes == null) { coalescedDeletes = new BufferedDeletes(true); } - coalescedDeletes.update(packet); + + /* + * since we are on a segment private del packet we must not + * update the coalescedDeletes here! We can simply advance to the + * next packet and seginfo. + */ delIDX--; infosIDX--; info.setBufferedDeletesGen(nextGen); @@ -285,7 +311,7 @@ return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted); } - public synchronized long getNextGen() { + synchronized long getNextGen() { return nextGen++; } @@ -303,10 +329,9 @@ if (infoStream != null) { message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size()); } - final int limit = deletes.size(); for(int delIDX=0;delIDX= minGen) { + if (deletes.get(delIDX).delGen() >= minGen) { prune(delIDX); assert checkDeleteStats(); return; Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -21,7 +21,9 @@ import java.io.PrintStream; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; @@ -117,8 +119,10 @@ private AtomicInteger numDocsInRAM = new AtomicInteger(0); final BufferedDeletesStream bufferedDeletesStream; - // TODO: cutover to BytesRefHash - private final BufferedDeletes pendingDeletes = new BufferedDeletes(false); + // TODO: cut over to BytesRefHash in BufferedDeletes + volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false)); + private final Queue ticketQueue = new LinkedList(); + private Collection abortedFiles; // List of files that were written before last abort() final IndexingChain chain; @@ -146,30 +150,12 @@ healthiness = new Healthiness(); final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; - flushControl = new DocumentsWriterFlushControl(flushPolicy, perThreadPool, healthiness, pendingDeletes, maxRamPerDWPT); + flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT); } - boolean deleteQueries(final Query... queries) throws IOException { - synchronized(this) { - for (Query query : queries) { - pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT); - } - } - - final Iterator threadsIterator = perThreadPool.getActivePerThreadsIterator(); - - while (threadsIterator.hasNext()) { - ThreadState state = threadsIterator.next(); - state.lock(); - try { - if (state.isActive()) { - state.perThread.deleteQueries(queries); - } - } finally { - state.unlock(); - } - } - + synchronized boolean deleteQueries(final Query... queries) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; + deleteQueue.addDelete(queries); return false; } @@ -177,29 +163,12 @@ return deleteQueries(query); } - boolean deleteTerms(final Term... terms) throws IOException { - synchronized(this) { - for (Term term : terms) { - pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT); - } - } - - Iterator threadsIterator = perThreadPool.getActivePerThreadsIterator(); - - while (threadsIterator.hasNext()) { - ThreadState state = threadsIterator.next(); - state.lock(); - try { - if (state.isActive()) { - state.perThread.deleteTerms(terms); - flushControl.doOnDelete(state); - } - } finally { - state.unlock(); - } - } + synchronized boolean deleteTerms(final Term... terms) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; + deleteQueue.addDelete(terms); + flushControl.doOnDelete(); if (flushControl.flushDeletes.getAndSet(false)) { - flushDeletes(); + flushDeletes(deleteQueue); } return false; } @@ -211,32 +180,20 @@ return deleteTerms(term); } - void deleteTerm(final Term term, ThreadState exclude) throws IOException { - synchronized(this) { - pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT); - } - final Iterator threadsIterator = perThreadPool.getActivePerThreadsIterator(); - - while (threadsIterator.hasNext()) { - ThreadState state = threadsIterator.next(); - if (state != exclude) { - state.lock(); - try { - state.perThread.deleteTerms(term); - flushControl.doOnDelete(state); - } finally { - state.unlock(); - } - } - } - if (flushControl.flushDeletes.getAndSet(false)) { - flushDeletes(); - } + + DocumentsWriterDeleteQueue currentDeleteSession() { + return deleteQueue; } - - private void flushDeletes() throws IOException { - maybePushPendingDeletes(); + + private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + if (deleteQueue != null) { + synchronized (ticketQueue) { + // freeze and insert the delete flush ticket in the queue + ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); + applyFlushTickets(null, null); + } + } indexWriter.applyAllDeletes(); indexWriter.flushCount.incrementAndGet(); } @@ -284,7 +241,7 @@ boolean success = false; synchronized (this) { - pendingDeletes.clear(); + deleteQueue.clear(); } try { @@ -322,17 +279,17 @@ } public int getBufferedDeleteTermsSize() { - return pendingDeletes.terms.size(); + return deleteQueue.getBufferedDeleteTermsSize(); } //for testing public int getNumBufferedDeleteTerms() { - return pendingDeletes.numTermDeletes.get(); + return deleteQueue.numGlobalTermDeletes(); } public boolean anyDeletions() { - return pendingDeletes.any(); - } + return deleteQueue.anyChanges(); + } void close() { closed = true; @@ -353,72 +310,132 @@ message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment"); } // try pick up pending threads here if possile - final DocumentsWriterPerThread flushingDWPT; - flushingDWPT = flushControl.getFlushIfPending(null); + DocumentsWriterPerThread flushingDWPT; + while ( (flushingDWPT = flushControl.nextPendingFlush()) != null){ // don't push the delete here since the update could fail! - maybeMerge = doFlush(flushingDWPT); + maybeMerge = doFlush(flushingDWPT); + if (!healthiness.isStalled()) { + break; + } + } if (infoStream != null && healthiness.isStalled()) { message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore"); } healthiness.waitIfStalled(); // block if stalled } - ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc); - DocumentsWriterPerThread flushingDWPT = null; + final DocumentsWriterPerThread flushingDWPT; + final DocumentsWriterPerThread dwpt; try { if (!perThread.isActive()) { ensureOpen(); assert false: "perThread is not active but we are still open"; } - final DocumentsWriterPerThread dwpt = perThread.perThread; + + dwpt = perThread.perThread; try { - dwpt.updateDocument(doc, analyzer, delTerm); + dwpt.updateDocument(doc, analyzer, delTerm); + numDocsInRAM.incrementAndGet(); } finally { if(dwpt.checkAndResetHasAborted()) { flushControl.doOnAbort(perThread); } } flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); - numDocsInRAM.incrementAndGet(); } finally { perThread.unlock(); } - // delete term from other DWPTs later, so that this thread - // doesn't have to lock multiple DWPTs at the same time - if (isUpdate) { - deleteTerm(delTerm, perThread); - } + maybeMerge |= doFlush(flushingDWPT); return maybeMerge; } - - - private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { + private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { boolean maybeMerge = false; while (flushingDWPT != null) { maybeMerge = true; + boolean success = false; + FlushTicket ticket = null; try { + /* + * Since with DWPT the flush process is concurrent and several DWPT + * could flush at the same time we must maintain the order of the + * flushes before we can apply the flushed segment and the frozen global + * deletes it is buffering. The reason for this is that the global + * deletes mark a certain point in time where we took a DWPT out of + * rotation and freeze the global deletes. + * + * Example: A flush 'A' starts and freezes the global deletes, then + * flush 'B' starts and freezes all deletes occurred since 'A' has + * started. if 'B' finishes before 'A' we need to wait until 'A' is done + * otherwise the deletes frozen by 'B' are not applied to 'A' and we + * might miss to deletes documents in 'A'. + */ + synchronized (ticketQueue) { + // each flush is assigned a ticket in the order they accquire the ticketQueue lock + ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); + ticketQueue.add(ticket); + } // flush concurrently without locking final FlushedSegment newSegment = flushingDWPT.flush(); - finishFlushedSegment(newSegment); + success = true; + /* + * now we are done and try to flush the ticket queue if the head of the + * queue has already finished the flush. + */ + applyFlushTickets(ticket, newSegment); } finally { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); indexWriter.flushCount.incrementAndGet(); + if (!success && ticket != null) { + synchronized (ticketQueue) { + // in the case of a failure make sure we are making progress and + // apply all the deletes since the segment flush failed + ticket.isSegmentFlush = false; + + } + } } - flushingDWPT = flushControl.nextPendingFlush() ; + flushingDWPT = flushControl.nextPendingFlush() ; } return maybeMerge; } - private void finishFlushedSegment(FlushedSegment newSegment) + private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException { + synchronized (ticketQueue) { + if (current != null) { + // this is a segment FlushTicket so assign the flushed segment so we can make progress. + assert segment != null; + current.segment = segment; + } + while (true) { + // while we can publish flushes keep on making the queue empty. + final FlushTicket head = ticketQueue.peek(); + if (head != null && head.canPublish()) { + ticketQueue.poll(); + finishFlushedSegment(head.segment, head.frozenDeletes); + } else { + break; + } + } + } + } + + + private void finishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) throws IOException { - pushDeletes(newSegment); - if (newSegment != null) { - indexWriter.addFlushedSegment(newSegment); + // this is eventually finishing the flushed segment and publishing it to the IndexWriter + if (bufferedDeletes != null && bufferedDeletes.any()) { + bufferedDeletesStream.push(bufferedDeletes); + if (infoStream != null) { + message("flush: push buffered deletes: " + bufferedDeletes); + } } + publishFlushedSegment(newSegment); + } final void subtractFlushedNumDocs(int numFlushed) { @@ -427,183 +444,88 @@ oldValue = numDocsInRAM.get(); } } - - private synchronized void pushDeletes(FlushedSegment flushedSegment) { - maybePushPendingDeletes(); - if (flushedSegment != null) { - BufferedDeletes deletes = flushedSegment.segmentDeletes; - final long delGen = bufferedDeletesStream.getNextGen(); - // Lock order: DW -> BD + + private void publishFlushedSegment(FlushedSegment newSegment) + throws IOException { + if (newSegment != null) { + final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment); + final BufferedDeletes deletes = newSegment.segmentDeletes; + FrozenBufferedDeletes packet = null; if (deletes != null && deletes.any()) { - final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, - delGen); - if (infoStream != null) { - message("flush: push buffered deletes"); - } - bufferedDeletesStream.push(packet); + // segment private delete + packet = new FrozenBufferedDeletes(deletes, true); if (infoStream != null) { - message("flush: delGen=" + packet.gen); + message("flush: push buffered seg private deletes: " + packet); } } - flushedSegment.segmentInfo.setBufferedDeletesGen(delGen); + indexWriter.publishFlushedSegment(segInfo, packet); } } - - private synchronized final void maybePushPendingDeletes() { - final long delGen = bufferedDeletesStream.getNextGen(); - if (pendingDeletes.any()) { - indexWriter.bufferedDeletesStream.push(new FrozenBufferedDeletes( - pendingDeletes, delGen)); - pendingDeletes.clear(); - } + + private final Object flushAllLock = new Object(); + // for asserts + private volatile DocumentsWriterDeleteQueue currentFlusingSession = null; + private boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) { + currentFlusingSession = session; + return true; } - + final boolean flushAllThreads(final boolean flushDeletes) throws IOException { - - final Iterator threadsIterator = perThreadPool.getActivePerThreadsIterator(); - boolean anythingFlushed = false; - - while (threadsIterator.hasNext()) { - final ThreadState perThread = threadsIterator.next(); - final DocumentsWriterPerThread flushingDWPT; - /* - * TODO: maybe we can leverage incoming / indexing threads here if we mark - * all active threads pending so that we don't need to block until we got - * the handle. Yet, we need to figure out how to identify that a certain - * DWPT has been flushed since they are simply replaced once checked out - * for flushing. This would give us another level of concurrency during - * commit. - * - * Maybe we simply iterate them and store the ThreadStates and mark - * all as flushPending and at the same time record the DWPT instance as a - * key for the pending ThreadState. This way we can easily iterate until - * all DWPT have changed. - */ - perThread.lock(); + synchronized (flushAllLock) { + final DocumentsWriterDeleteQueue flushingDeleteQueue; + synchronized (this) { + flushingDeleteQueue = deleteQueue; + deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false)); + assert setFlushingDeleteQueue(flushingDeleteQueue); + } + assert flushingDeleteQueue == currentFlusingSession; + boolean anythingFlushed = false; + boolean success = false; try { - if (!perThread.isActive()) { - assert closed; - continue; //this perThread is already done maybe by a concurrently indexing thread + flushControl.markForFullFlush(); + DocumentsWriterPerThread flushingDWPT; + // now try help out with flushing + while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { + anythingFlushed |= doFlush(flushingDWPT); } - final DocumentsWriterPerThread dwpt = perThread.perThread; - // Always flush docs if there are any - final boolean flushDocs = dwpt.getNumDocsInRAM() > 0; - final String segment = dwpt.getSegment(); - // If we are flushing docs, segment must not be null: - assert segment != null || !flushDocs; - if (flushDocs) { - // check out and set pending if not already set - flushingDWPT = flushControl.tryCheckoutForFlush(perThread, true); - assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; - assert dwpt == flushingDWPT : "flushControl returned different DWPT"; - try { - final FlushedSegment newSegment = dwpt.flush(); - anythingFlushed = true; - finishFlushedSegment(newSegment); - } finally { - flushControl.doAfterFlush(flushingDWPT); - } + // if a concurrent flush is still in flight wait for it + while (!flushControl.allFlushesDue()) { + flushControl.waitForFlush(); } + if (!anythingFlushed && flushDeletes) { + synchronized (ticketQueue) { + ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false)); + } + applyFlushTickets(null, null); + } + success = true; } finally { - perThread.unlock(); + assert flushingDeleteQueue == currentFlusingSession; + assert setFlushingDeleteQueue(null); + if (!success) { + flushControl.abortFullFlushes(); + } else { + // release the flush lock + flushControl.finishFullFlush(); + } } + return anythingFlushed; } - - if (!anythingFlushed && flushDeletes) { - maybePushPendingDeletes(); - } - - - return anythingFlushed; } - - - - -// /* We have three pools of RAM: Postings, byte blocks -// * (holds freq/prox posting data) and per-doc buffers -// * (stored fields/term vectors). Different docs require -// * varying amount of storage from these classes. For -// * example, docs with many unique single-occurrence short -// * terms will use up the Postings RAM and hardly any of -// * the other two. Whereas docs with very large terms will -// * use alot of byte blocks RAM. This method just frees -// * allocations from the pools once we are over-budget, -// * which balances the pools to match the current docs. */ -// void balanceRAM() { -// -// final boolean doBalance; -// final long deletesRAMUsed; -// -// deletesRAMUsed = bufferedDeletes.bytesUsed(); -// -// synchronized(this) { -// if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) { -// return; -// } -// -// doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize; -// } -// -// if (doBalance) { -// -// if (infoStream != null) -// message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) + -// " vs trigger=" + toMB(ramBufferSize) + -// " deletesMB=" + toMB(deletesRAMUsed) + -// " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) + -// " perDocFree=" + toMB(perDocAllocator.bytesUsed())); -// -// final long startBytesUsed = bytesUsed() + deletesRAMUsed; -// -// int iter = 0; -// -// // We free equally from each pool in 32 KB -// // chunks until we are below our threshold -// // (freeLevel) -// -// boolean any = true; -// -// while(bytesUsed()+deletesRAMUsed > freeLevel) { -// -// synchronized(this) { -// if (0 == perDocAllocator.numBufferedBlocks() && -// 0 == byteBlockAllocator.numBufferedBlocks() && -// 0 == freeIntBlocks.size() && !any) { -// // Nothing else to free -- must flush now. -// bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize; -// if (infoStream != null) { -// if (bytesUsed()+deletesRAMUsed > ramBufferSize) -// message(" nothing to free; set bufferIsFull"); -// else -// message(" nothing to free"); -// } -// break; -// } -// -// if ((0 == iter % 4) && byteBlockAllocator.numBufferedBlocks() > 0) { -// byteBlockAllocator.freeBlocks(1); -// } -// if ((1 == iter % 4) && freeIntBlocks.size() > 0) { -// freeIntBlocks.remove(freeIntBlocks.size()-1); -// bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT); -// } -// if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) { -// perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K) -// } -// } -// -// if ((3 == iter % 4) && any) -// // Ask consumer to free any recycled state -// any = consumer.freeRAM(); -// -// iter++; -// } -// -// if (infoStream != null) -// message(" after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.)); -// } -// } + static final class FlushTicket { + final FrozenBufferedDeletes frozenDeletes; + FlushedSegment segment; + boolean isSegmentFlush; + + FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) { + this.frozenDeletes = frozenDeletes; + this.isSegmentFlush = isSegmentFlush; + } + + boolean canPublish() { + return (!isSegmentFlush || segment != null); + } + } } Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (revision 0) @@ -0,0 +1,373 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.search.Query; + +/** + * {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes + * queue. In contrast to other queue implementation we only maintain only the + * tail of the queue. A delete queue is always used in a context of a set of + * DWPT and a global delete pool. Each of the DWPT and the global pool need to + * maintain their 'own' head of the queue. The difference between the DWPT and + * the global pool is that the DWPT starts maintaining a head once it has added + * its first document since for its segments private deletes only the deletes + * after that document are relevant. The global pool instead starts maintaining + * the head once this instance is created by taking the sentinel instance as its + * initial head. + *

+ * Since each {@link DeleteSlice} maintains its own head and the list is only + * single linked the garbage collector takes care of pruning the list for us. + * All nodes in the list that are still relevant should be either directly or + * indirectly referenced by one of the DWPT's private {@link DeleteSlice} or by + * the global {@link BufferedDeletes} slice. + *

+ * Each DWPT as well as the global delete pool maintain their private + * DeleteSlice instance. In the DWPT case updating a slice is equivalent to + * atomically finishing the document. The slice update guarantees a happens + * before relationship to all other updates in the same indexing session. When a + * DWPT updates a document it + * + *

    + *
  1. consumes a document finishes its processing
  2. + *
  3. updates its private {@link DeleteSlice} either by calling + * {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the + * document has a delTerm)
  4. + *
  5. applies all deletes in the slice to its private {@link BufferedDeletes} + * and resets it
  6. + *
  7. increments its internal document id
  8. + *
+ * + * The DWPT also doesn't apply its current documents delete term until it has + * updated its delete slice which ensures the consistency of the update. if the + * update fails before the DeleteSlice could have been updated the deleteTerm + * will also not be added to its private deletes neither to the global deletes. + * + */ +final class DocumentsWriterDeleteQueue { + + private volatile Node tail; + + private static final AtomicReferenceFieldUpdater tailUpdater = AtomicReferenceFieldUpdater + .newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail"); + + private final DeleteSlice globalSlice; + private final BufferedDeletes globalBufferedDeletes; + /* only acquired to update the global deletes */ + private final ReentrantLock globalBufferLock = new ReentrantLock(); + + DocumentsWriterDeleteQueue(BufferedDeletes globalBufferedDeletes) { + this.globalBufferedDeletes = globalBufferedDeletes; + /* + * we use a sentinel instance as our initial tail. No slice will ever try to + * apply this tail since the head is always omitted. + */ + tail = new Node(null); // sentinel + globalSlice = new DeleteSlice(tail); + } + + void addDelete(Query... queries) { + for (Query query : queries) { + /* + * TODO: maybe we have a Query[ ] Node that make a set of queries atomic + * too! + */ + add(new QueryNode(query)); + } + tryApplyGlobalSlice(); + } + + void addDelete(Term... terms) { + for (Term term : terms) { + /* + * TODO: maybe we have a Term[ ] Node that make a set of queries atomic + * too! + */ + + add(new TermNode(term)); + } + tryApplyGlobalSlice(); + } + + /** + * invariant for document update + */ + void add(Term term, DeleteSlice slice) { + final TermNode termNode = new TermNode(term); + add(termNode); + /* + * this is an update request where the term is the updated documents + * delTerm. in that case we need to guarantee that this insert is atomic + * with regards to the given delete slice. This means if two threads try to + * update the same document with in turn the same delTerm one of them must + * win. By taking the node we have created for our del term as the new tail + * it is guaranteed that if another thread adds the same right after us we + * will apply this delete next time we update our slice and one of the two + * competing updates wins! + */ + slice.sliceTail = termNode; + assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add"; + tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe + // we can do it just every n times or so? + } + + void add(Node item) { + /* + * this non-blocking / 'wait-free' linked list add was inspired by Apache + * Harmony's ConcurrentLinkedQueue Implementation. + */ + while (true) { + final Node currentTail = this.tail; + final Node tailNext = currentTail.next; + if (tail == currentTail) { + if (tailNext != null) { + /* + * we are in intermediate state here. the tails next pointer has been + * advanced but the tail itself might not be updated yet. help to + * advance the tail and try again updating it. + */ + tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail + } else { + /* + * we are in quiescent state and can try to insert the item to the + * current tail if we fail to insert we just retry the operation since + * somebody else has already added its item + */ + if (currentTail.casNext(null, item)) { + /* + * now that we are done we need to advance the tail while another + * thread could have advanced it already so we can ignore the return + * type of this CAS call + */ + tailUpdater.compareAndSet(this, currentTail, item); + return; + } + } + } + } + } + + boolean anyChanges() { + globalBufferLock.lock(); + try { + return !globalSlice.isEmpty() || globalBufferedDeletes.any(); + } finally { + globalBufferLock.unlock(); + } + } + + void tryApplyGlobalSlice() { + if (globalBufferLock.tryLock()) { + /* + * the global buffer must be locked but we don't need to upate them if + * there is an update going on right now. It is sufficient to apply the + * deletes that have been added after the current in-flight global slices + * tail the next time we can get the lock! + */ + try { + if (updateSlice(globalSlice)) { + globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT); + + } + } finally { + globalBufferLock.unlock(); + } + } + } + + FrozenBufferedDeletes freezeGlobalBuffer(DeleteSlice callerSlice) { + globalBufferLock.lock(); + /* + * here we are freezing the global buffer so we need to lock it, apply all + * deletes in the queue and reset the global slice to let the GC prune the + * queue. + */ + final Node currentTail = tail; // take the current tail make this local any + // changes after this call are applied later + // and not relevant here + if (callerSlice != null) { + // update the callers slices so we are on the same page + callerSlice.sliceTail = currentTail; + } + try { + if (globalSlice.sliceTail != currentTail) { + globalSlice.sliceTail = currentTail; + globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT); + } + + final FrozenBufferedDeletes packet = new FrozenBufferedDeletes( + globalBufferedDeletes, false); + globalBufferedDeletes.clear(); + return packet; + } finally { + globalBufferLock.unlock(); + } + } + + DeleteSlice newSlice() { + return new DeleteSlice(tail); + } + + boolean updateSlice(DeleteSlice slice) { + if (slice.sliceTail != tail) { // if we are the same just + slice.sliceTail = tail; + return true; + } + return false; + } + + static class DeleteSlice { + // no need to be volatile, slices are only access by one thread! + Node sliceHead; // we don't apply this one + Node sliceTail; + + DeleteSlice(Node currentTail) { + assert currentTail != null; + /* + * Initially this is a 0 length slice pointing to the 'current' tail of + * the queue. Once we update the slice we only need to assign the tail and + * have a new slice + */ + sliceHead = sliceTail = currentTail; + } + + void apply(BufferedDeletes del, int docIDUpto) { + if (sliceHead == sliceTail) { + // 0 length slice + return; + } + /* + * when we apply a slice we take the head and get its next as our first + * item to apply and continue until we applied the tail. If the head and + * tail in this slice are not equal then there will be at least one more + * non-null node in the slice! + */ + Node current = sliceHead; + do { + current = current.next; + assert current != null : "slice property violated between the head on the tail must not be a null node"; + current.apply(del, docIDUpto); + } while (current != sliceTail); + reset(); + } + + void reset() { + // resetting to a 0 length slice + sliceHead = sliceTail; + } + + /** + * Returns true iff the given item is identical to the item + * hold by the slices tail, otherwise false. + */ + boolean isTailItem(Object item) { + return sliceTail.item == item; + } + + boolean isEmpty() { + return sliceHead == sliceTail; + } + } + + public int numGlobalTermDeletes() { + return globalBufferedDeletes.numTermDeletes.get(); + } + + void clear() { + globalBufferLock.lock(); + try { + final Node currentTail = tail; + globalSlice.sliceHead = globalSlice.sliceTail = currentTail; + globalBufferedDeletes.clear(); + } finally { + globalBufferLock.unlock(); + } + } + + private static class Node { + volatile Node next; + final Object item; + + private Node(Object item) { + this.item = item; + } + + static final AtomicReferenceFieldUpdater nextUpdater = AtomicReferenceFieldUpdater + .newUpdater(Node.class, Node.class, "next"); + + void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { + assert false : "sentinel item must never be applied"; + } + + boolean casNext(Node cmp, Node val) { + return nextUpdater.compareAndSet(this, cmp, val); + } + } + + private static final class TermNode extends Node { + + TermNode(Term term) { + super(term); + } + + @Override + void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { + bufferedDeletes.addTerm((Term) item, docIDUpto); + } + + } + + private static final class QueryNode extends Node { + QueryNode(Query query) { + super(query); + } + + @Override + void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { + bufferedDeletes.addQuery((Query) item, docIDUpto); + } + + } + + private boolean forceApplyGlobalSlice() { + globalBufferLock.lock(); + final Node currentTail = tail; + try { + if (globalSlice.sliceTail != currentTail) { + globalSlice.sliceTail = currentTail; + globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT); + } + return globalBufferedDeletes.any(); + } finally { + globalBufferLock.unlock(); + } + } + + public int getBufferedDeleteTermsSize() { + globalBufferLock.lock(); + try { + forceApplyGlobalSlice(); + return globalBufferedDeletes.terms.size(); + } finally { + globalBufferLock.unlock(); + } + + } +} Property changes on: lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java ___________________________________________________________________ Added: svn:eol-style + native Added: svn:keywords + Date Author Id Revision HeadURL Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (working copy) @@ -16,11 +16,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.util.ThreadInterruptedException; /** * This class controls {@link DocumentsWriterPerThread} flushing during @@ -42,6 +46,11 @@ private volatile int numPending = 0; private volatile int numFlushing = 0; final AtomicBoolean flushDeletes = new AtomicBoolean(false); + private boolean fullFlush = false; + private Queue flushQueue = new LinkedList(); + // only for safety reasons if a DWPT is close to the RAM limit + private Queue blockedFlushes = new LinkedList(); + long peakActiveBytes = 0;// only with assert long peakFlushBytes = 0;// only with assert @@ -51,16 +60,15 @@ private final FlushPolicy flushPolicy; private boolean closed = false; private final HashMap flushingWriters = new HashMap(); - private final BufferedDeletes pendingDeletes; + private final DocumentsWriter documentsWriter; - DocumentsWriterFlushControl(FlushPolicy flushPolicy, - DocumentsWriterPerThreadPool threadPool, Healthiness healthiness, - BufferedDeletes pendingDeletes, long maxBytesPerDWPT) { + DocumentsWriterFlushControl(DocumentsWriter documentsWriter, + Healthiness healthiness, long maxBytesPerDWPT) { this.healthiness = healthiness; - this.perThreadPool = threadPool; - this.flushPolicy = flushPolicy; + this.perThreadPool = documentsWriter.perThreadPool; + this.flushPolicy = documentsWriter.flushPolicy; this.maxBytesPerDWPT = maxBytesPerDWPT; - this.pendingDeletes = pendingDeletes; + this.documentsWriter = documentsWriter; } public synchronized long activeBytes() { @@ -113,6 +121,11 @@ // is super // important since we can not address more than 2048 MB per DWPT setFlushPending(perThread); + if (fullFlush) { + DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false); + assert toBlock != null; + blockedFlushes.add(toBlock); + } } } final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread); @@ -122,11 +135,29 @@ synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { assert flushingWriters.containsKey(dwpt); - numFlushing--; - Long bytes = flushingWriters.remove(dwpt); - flushBytes -= bytes.longValue(); - perThreadPool.recycle(dwpt); - healthiness.updateStalled(this); + try { + numFlushing--; + Long bytes = flushingWriters.remove(dwpt); + flushBytes -= bytes.longValue(); + perThreadPool.recycle(dwpt); + healthiness.updateStalled(this); + } finally { + notifyAll(); + } + } + + public synchronized boolean allFlushesDue() { + return numFlushing == 0; + } + + public synchronized void waitForFlush() { + if (numFlushing != 0) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + } } /** @@ -157,6 +188,13 @@ synchronized DocumentsWriterPerThread tryCheckoutForFlush( ThreadState perThread, boolean setPending) { + if (fullFlush) + return null; + return internalTryCheckOutForFlush(perThread, setPending); + } + + private DocumentsWriterPerThread internalTryCheckOutForFlush( + ThreadState perThread, boolean setPending) { if (setPending && !perThread.flushPending) { setFlushPending(perThread); } @@ -185,7 +223,7 @@ return null; } - DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) { + private DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) { if (numPending > 0) { final DocumentsWriterPerThread dwpt = perThread == null ? null : tryCheckoutForFlush(perThread, false); @@ -204,6 +242,12 @@ } DocumentsWriterPerThread nextPendingFlush() { + synchronized (this) { + DocumentsWriterPerThread poll = flushQueue.poll(); + if (poll != null) { + return poll; + } + } if (numPending > 0) { final Iterator allActiveThreads = perThreadPool .getActivePerThreadsIterator(); @@ -236,17 +280,16 @@ return flushPolicy.getMaxNetBytes(); } - synchronized void doOnDelete(ThreadState state) { - if (!state.flushPending) { - flushPolicy.onDelete(this, state); - } + synchronized void doOnDelete() { + // pass null this is a global delete no update + flushPolicy.onDelete(this, null); } /** * Returns the number of delete terms in the global pool */ public int getNumGlobalTermDeletes() { - return pendingDeletes.numTermDeletes.get(); + return documentsWriter.deleteQueue.numGlobalTermDeletes(); } int numFlushingDWPT() { @@ -260,4 +303,66 @@ int numActiveDWPT() { return this.perThreadPool.getMaxThreadStates(); } + + void markForFullFlush() { + synchronized (this) { + assert !fullFlush; + fullFlush = true; + } + final Iterator allActiveThreads = perThreadPool + .getActivePerThreadsIterator(); + final ArrayList toFlush = new ArrayList(); + while (allActiveThreads.hasNext()) { + final ThreadState next = allActiveThreads.next(); + next.lock(); + try { + if (!next.isActive()) { + continue; + } + if (next.perThread.getNumDocsInRAM() > 0) { + final DocumentsWriterPerThread dwpt = next.perThread; // just for assert + final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next, true); + assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; + assert dwpt == flushingDWPT : "flushControl returned different DWPT"; + toFlush.add(flushingDWPT); + } else { + next.perThread.initialize(); + } + } finally { + next.unlock(); + } + } + synchronized (this) { + flushQueue.addAll(blockedFlushes); + blockedFlushes.clear(); + flushQueue.addAll(toFlush); + } + + } + + synchronized void finishFullFlush() { + assert fullFlush; + assert flushQueue.isEmpty(); + try { + if (!blockedFlushes.isEmpty()) { + flushQueue.addAll(blockedFlushes); + blockedFlushes.clear(); + } + } finally { + fullFlush = false; + } + } + + synchronized void abortFullFlushes() { + try { + for (DocumentsWriterPerThread dwpt : flushQueue) { + doAfterFlush(dwpt); + } + for (DocumentsWriterPerThread dwpt : blockedFlushes) { + doAfterFlush(dwpt); + } + } finally { + fullFlush = false; + } + } } \ No newline at end of file Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (working copy) @@ -27,7 +27,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; -import org.apache.lucene.search.Query; +import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; import org.apache.lucene.search.SimilarityProvider; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BitVector; @@ -136,6 +136,7 @@ } pendingDeletes.clear(); + deleteSlice = deleteQueue.newSlice(); // Reset all postings data doAfterFlush(); @@ -165,6 +166,8 @@ private final PrintStream infoStream; private int numDocsInRAM; private int flushedDocCount; + DocumentsWriterDeleteQueue deleteQueue; + DeleteSlice deleteSlice; public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, FieldInfos fieldInfos, IndexingChain indexingChain) { @@ -180,11 +183,19 @@ consumer = indexingChain.getChain(this); bytesUsed = new AtomicLong(0); pendingDeletes = new BufferedDeletes(false); + initialize(); } public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) { this(other.directory, other.parent, fieldInfos, other.parent.chain); - + } + + void initialize() { + deleteQueue = parent.deleteQueue; + assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; + pendingDeletes.clear(); + deleteSlice = null; + } void setAborting() { @@ -199,13 +210,10 @@ public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException { assert writer.testPoint("DocumentsWriterPerThread addDocument start"); + assert deleteQueue != null; docState.doc = doc; docState.analyzer = analyzer; docState.docID = numDocsInRAM; - if (delTerm != null) { - pendingDeletes.addTerm(delTerm, numDocsInRAM); - } - if (segment == null) { // this call is synchronized on IndexWriter.segmentInfos segment = writer.newSegmentName(); @@ -219,7 +227,6 @@ } finally { docState.clear(); } - success = true; } finally { if (!success) { @@ -232,18 +239,44 @@ } } } - success = false; try { - numDocsInRAM++; consumer.finishDocument(); - success = true; } finally { if (!success) { abort(); } } + finishDocument(delTerm); + } + + private void finishDocument(Term delTerm) throws IOException { + /* + * here we actually finish the document in two steps 1. push the delete into + * the queue and update our slice. 2. increment the DWPT private document + * id. + * + * the updated slice we get from 1. holds all the deletes that have occurred + * since we updated the slice the last time. + */ + if (deleteSlice == null) { + deleteSlice = deleteQueue.newSlice(); + if (delTerm != null) { + deleteQueue.add(delTerm, deleteSlice); + deleteSlice.reset(); + } + + } else { + if (delTerm != null) { + deleteQueue.add(delTerm, deleteSlice); + assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + deleteSlice.apply(pendingDeletes, numDocsInRAM); + } else if (deleteQueue.updateSlice(deleteSlice)) { + deleteSlice.apply(pendingDeletes, numDocsInRAM); + } + } + ++numDocsInRAM; } // Buffer a specific docID for deletion. Currently only @@ -261,22 +294,6 @@ // confounding exception). } - void deleteQueries(Query... queries) { - if (numDocsInRAM > 0) { - for (Query query : queries) { - pendingDeletes.addQuery(query, numDocsInRAM); - } - } - } - - void deleteTerms(Term... terms) { - if (numDocsInRAM > 0) { - for (Term term : terms) { - pendingDeletes.addTerm(term, numDocsInRAM); - } - } - } - /** * Returns the number of delete terms in this {@link DocumentsWriterPerThread} */ @@ -305,6 +322,15 @@ parent.subtractFlushedNumDocs(numDocsInRAM); numDocsInRAM = 0; } + + FrozenBufferedDeletes prepareFlush() { + assert numDocsInRAM > 0; + final FrozenBufferedDeletes globalDeletes = deleteQueue.freezeGlobalBuffer(deleteSlice); + // apply all deletes before we flush and release the delete slice + deleteSlice.apply(pendingDeletes, numDocsInRAM); + deleteSlice = null; + return globalDeletes; + } /** Flush all pending docs to a new segment */ FlushedSegment flush() throws IOException { Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java =================================================================== --- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (working copy) @@ -118,7 +118,10 @@ public synchronized ThreadState newThreadState() { if (numThreadStatesActive < perThreads.length) { - return perThreads[numThreadStatesActive++]; + final ThreadState threadState = perThreads[numThreadStatesActive]; + threadState.perThread.initialize(); + numThreadStatesActive++; + return threadState; } return null; } @@ -128,7 +131,9 @@ final DocumentsWriterPerThread dwpt = threadState.perThread; if (!closed) { final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider)); - threadState.resetWriter(new DocumentsWriterPerThread(dwpt, infos)); + final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos); + newDwpt.initialize(); + threadState.resetWriter(newDwpt); } else { threadState.resetWriter(null); } Index: lucene/src/java/org/apache/lucene/index/FlushPolicy.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FlushPolicy.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/FlushPolicy.java (working copy) @@ -55,8 +55,8 @@ protected IndexWriterConfig indexWriterConfig; /** - * Called for each delete term applied to the given {@link ThreadState}s - * {@link DocumentsWriterPerThread}. + * Called for each delete term. If this is a delete triggered due to an update + * the given {@link ThreadState} is non-null. *

* Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling Index: lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java =================================================================== --- lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (working copy) @@ -52,9 +52,15 @@ final int[] queryLimits; final int bytesUsed; final int numTermDeletes; - final long gen; + private long gen = -1; // assigned by BufferedDeletesStream once pushed + final boolean isSegmentPrivate; // set to true iff this frozen packet represents + // a segment private deletes. in that case is should + // only have Queries + - public FrozenBufferedDeletes(BufferedDeletes deletes, long gen) { + public FrozenBufferedDeletes(BufferedDeletes deletes, boolean isSegmentPrivate) { + this.isSegmentPrivate = isSegmentPrivate; + assert !isSegmentPrivate || deletes.terms.size() == 0 : "segment private package should only have del queries"; terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]); queries = new Query[deletes.queries.size()]; queryLimits = new int[deletes.queries.size()]; @@ -66,8 +72,17 @@ } bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY; numTermDeletes = deletes.numTermDeletes.get(); + } + + public void setDelGen(long gen) { + assert this.gen == -1; this.gen = gen; } + + public long delGen() { + assert gen != -1; + return gen; + } public Iterable termsIterable() { return new Iterable() { Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 1090383) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -47,6 +47,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.util.BitVector; import org.apache.lucene.util.Bits; import org.apache.lucene.util.Constants; import org.apache.lucene.util.ThreadInterruptedException; @@ -356,8 +357,8 @@ // reader; in theory we could do similar retry logic, // just like we do when loading segments_N IndexReader r; + flush(false, applyAllDeletes); // don't sync on IW here DWPT will deadlock synchronized(this) { - flush(false, applyAllDeletes); r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes); if (infoStream != null) { message("return reader version=" + r.getVersion() + " reader=" + r); @@ -2026,7 +2027,16 @@ deleter.checkpoint(segmentInfos, false); } - void addFlushedSegment(FlushedSegment flushedSegment) throws IOException { + /** + * Prepares the {@link SegmentInfo} for the new flushed segment and persists + * the deleted documents {@link BitVector}. Use + * {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to + * publish the returned {@link SegmentInfo} together with its segment private + * delete packet. + * + * @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes) + */ + SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException { assert flushedSegment != null; SegmentInfo newSegment = flushedSegment.segmentInfo; @@ -2098,9 +2108,32 @@ } } } - - - synchronized(this) { + return newSegment; + } + + /** + * Atomically adds the segment private delete packet and publishes the flushed + * segments SegmentInfo to the index writer. NOTE: use + * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the + * {@link SegmentInfo} for the flushed segment. + * + * @see #prepareFlushedSegment(FlushedSegment) + */ + synchronized void publishFlushedSegment(SegmentInfo newSegment, + FrozenBufferedDeletes packet) throws IOException { + // lock order IW -> BDS + synchronized (bufferedDeletesStream) { + // publishing the segment must be synched on IW -> BDS to make the sure + // that no merge prunes away the seg. private delete packet + final long nextGen; + if (packet != null && packet.any()) { + nextGen = bufferedDeletesStream.push(packet); + } else { + // since we don't have a delete packet to apply we can get a new + // generation right away + nextGen = bufferedDeletesStream.getNextGen(); + } + newSegment.setBufferedDeletesGen(nextGen); segmentInfos.add(newSegment); checkpoint(); } @@ -2602,30 +2635,33 @@ } final synchronized void applyAllDeletes() throws IOException { - flushDeletesCount.incrementAndGet(); - final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos); - if (result.anyDeletes) { - checkpoint(); - } - if (!keepFullyDeletedSegments && result.allDeleted != null) { - if (infoStream != null) { - message("drop 100% deleted segments: " + result.allDeleted); +// synchronized (bufferedDeletesStream) { + flushDeletesCount.incrementAndGet(); + final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream + .applyDeletes(readerPool, segmentInfos); + if (result.anyDeletes) { + checkpoint(); } - for(SegmentInfo info : result.allDeleted) { - // If a merge has already registered for this - // segment, we leave it in the readerPool; the - // merge will skip merging it and will then drop - // it once it's done: - if (!mergingSegments.contains(info)) { - segmentInfos.remove(info); - if (readerPool != null) { - readerPool.drop(info); + if (!keepFullyDeletedSegments && result.allDeleted != null) { + if (infoStream != null) { + message("drop 100% deleted segments: " + result.allDeleted); + } + for (SegmentInfo info : result.allDeleted) { + // If a merge has already registered for this + // segment, we leave it in the readerPool; the + // merge will skip merging it and will then drop + // it once it's done: + if (!mergingSegments.contains(info)) { + segmentInfos.remove(info); + if (readerPool != null) { + readerPool.drop(info); + } } } + checkpoint(); } - checkpoint(); - } - bufferedDeletesStream.prune(segmentInfos); + bufferedDeletesStream.prune(segmentInfos); +// } } /** Expert: Return the total size of all index files currently cached in memory. @@ -3065,7 +3101,6 @@ // Lock order: IW -> BD bufferedDeletesStream.prune(segmentInfos); - Map details = new HashMap(); details.put("optimize", Boolean.toString(merge.optimize)); details.put("mergeFactor", Integer.toString(merge.segments.size())); Index: lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java (revision 0) @@ -0,0 +1,221 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * Unit test for {@link DocumentsWriterDeleteQueue} + */ +public class TestDocumentsWriterDeleteQueue extends LuceneTestCase { + + public void testUpdateDelteSlices() { + DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( + new BufferedDeletes(false)); + final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER; + Integer[] ids = new Integer[size]; + for (int i = 0; i < ids.length; i++) { + ids[i] = random.nextInt(); + } + Term template = new Term("id"); + DeleteSlice slice1 = queue.newSlice(); + DeleteSlice slice2 = queue.newSlice(); + BufferedDeletes bd1 = new BufferedDeletes(false); + BufferedDeletes bd2 = new BufferedDeletes(false); + int last1 = 0; + int last2 = 0; + Set uniqueValues = new HashSet(); + for (int j = 0; j < ids.length; j++) { + Integer i = ids[j]; + Term term = template.createTerm(i.toString()); + uniqueValues.add(term); + queue.addDelete(term); + if (random.nextInt(20) == 0 || j == ids.length - 1) { + queue.updateSlice(slice1); + assertTrue(slice1.isTailItem(term)); + slice1.apply(bd1, j); + assertAllBetween(last1, j, bd1, ids); + last1 = j + 1; + } + if (random.nextInt(10) == 5 || j == ids.length - 1) { + queue.updateSlice(slice2); + assertTrue(slice2.isTailItem(term)); + slice2.apply(bd2, j); + assertAllBetween(last2, j, bd2, ids); + last2 = j + 1; + } + assertEquals(uniqueValues.size(), queue.numGlobalTermDeletes()); + } + assertEquals(uniqueValues, bd1.terms.keySet()); + assertEquals(uniqueValues, bd2.terms.keySet()); + assertEquals(uniqueValues, new HashSet(Arrays.asList(queue + .freezeGlobalBuffer(null).terms))); + assertEquals("num deletes must be 0 after freeze", 0, queue + .numGlobalTermDeletes()); + } + + private void assertAllBetween(int start, int end, BufferedDeletes deletes, + Integer[] ids) { + Term template = new Term("id"); + for (int i = start; i <= end; i++) { + assertEquals(Integer.valueOf(end), deletes.terms.get(template + .createTerm(ids[i].toString()))); + } + } + + public void testClear() { + DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( + new BufferedDeletes(false)); + Term template = new Term("id"); + assertFalse(queue.anyChanges()); + queue.clear(); + assertFalse(queue.anyChanges()); + final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER; + int termsSinceFreeze = 0; + int queriesSinceFreeze = 0; + for (int i = 0; i < size; i++) { + Term term = template.createTerm("" + i); + if (random.nextInt(10) == 0) { + queue.addDelete(new TermQuery(term)); + queriesSinceFreeze++; + } else { + queue.addDelete(term); + termsSinceFreeze++; + } + assertTrue(queue.anyChanges()); + if (random.nextInt(10) == 0) { + queue.clear(); + queue.tryApplyGlobalSlice(); + assertFalse(queue.anyChanges()); + } + } + + } + + public void testAnyChanges() { + DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( + new BufferedDeletes(false)); + Term template = new Term("id"); + final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER; + int termsSinceFreeze = 0; + int queriesSinceFreeze = 0; + for (int i = 0; i < size; i++) { + Term term = template.createTerm("" + i); + if (random.nextInt(10) == 0) { + queue.addDelete(new TermQuery(term)); + queriesSinceFreeze++; + } else { + queue.addDelete(term); + termsSinceFreeze++; + } + assertTrue(queue.anyChanges()); + if (random.nextInt(5) == 0) { + FrozenBufferedDeletes freezeGlobalBuffer = queue + .freezeGlobalBuffer(null); + assertEquals(termsSinceFreeze, freezeGlobalBuffer.terms.length); + assertEquals(queriesSinceFreeze, freezeGlobalBuffer.queries.length); + queriesSinceFreeze = 0; + termsSinceFreeze = 0; + assertFalse(queue.anyChanges()); + } + } + } + + public void testStressDeleteQueue() throws InterruptedException { + DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( + new BufferedDeletes(false)); + Set uniqueValues = new HashSet(); + final int size = 10000 + random.nextInt(500) * RANDOM_MULTIPLIER; + Integer[] ids = new Integer[size]; + Term template = new Term("id"); + for (int i = 0; i < ids.length; i++) { + ids[i] = random.nextInt(); + uniqueValues.add(template.createTerm(ids[i].toString())); + } + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger index = new AtomicInteger(0); + final int numThreads = 2 + random.nextInt(5); + UpdateThread[] threads = new UpdateThread[numThreads]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new UpdateThread(queue, index, ids, latch); + threads[i].start(); + } + latch.countDown(); + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + + for (UpdateThread updateThread : threads) { + DeleteSlice slice = updateThread.slice; + queue.updateSlice(slice); + BufferedDeletes deletes = updateThread.deletes; + slice.apply(deletes, BufferedDeletes.MAX_INT); + assertEquals(uniqueValues, deletes.terms.keySet()); + } + queue.tryApplyGlobalSlice(); + assertEquals(uniqueValues, new HashSet(Arrays.asList(queue + .freezeGlobalBuffer(null).terms))); + assertEquals("num deletes must be 0 after freeze", 0, queue + .numGlobalTermDeletes()); + } + + private static class UpdateThread extends Thread { + final DocumentsWriterDeleteQueue queue; + final AtomicInteger index; + final Integer[] ids; + final DeleteSlice slice; + final BufferedDeletes deletes; + final CountDownLatch latch; + + protected UpdateThread(DocumentsWriterDeleteQueue queue, + AtomicInteger index, Integer[] ids, CountDownLatch latch) { + this.queue = queue; + this.index = index; + this.ids = ids; + this.slice = queue.newSlice(); + deletes = new BufferedDeletes(false); + this.latch = latch; + } + + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + Term template = new Term("id"); + int i = 0; + while ((i = index.getAndIncrement()) < ids.length) { + Term term = template.createTerm(ids[i].toString()); + queue.add(term, slice); + assertTrue(slice.isTailItem(term)); + slice.apply(deletes, BufferedDeletes.MAX_INT); + } + } + } + +} Property changes on: lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java ___________________________________________________________________ Added: svn:eol-style + native Added: svn:keywords + Date Author Id Revision HeadURL Index: lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java (revision 1090383) +++ lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java (working copy) @@ -19,6 +19,7 @@ import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.*; +import org.apache.lucene.document.Field.Index; import org.apache.lucene.store.*; import org.apache.lucene.util.*; import org.junit.Test; @@ -34,7 +35,7 @@ final LineFileDocs docs = new LineFileDocs(random); - final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer())); + final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setMergePolicy(newLogMergePolicy())); final int SIZE = 200 * RANDOM_MULTIPLIER; int id = 0; IndexReader r = null; @@ -72,4 +73,70 @@ dir.close(); } + + + public void testUpdateSameDoc() throws Exception { + final Directory dir = newDirectory(); + + final LineFileDocs docs = new LineFileDocs(random); + for (int r = 0; r < 3; r++) { + final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig( + TEST_VERSION_CURRENT, new MockAnalyzer()).setMergePolicy( + newLogMergePolicy()).setMaxBufferedDocs(2)); + + final int SIZE = 200 * RANDOM_MULTIPLIER; + final int numUpdates = (int) (SIZE * (2 + random.nextDouble())); + int numThreads = 3; + IndexingThread[] threads = new IndexingThread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new IndexingThread(docs, w, numUpdates); + threads[i].start(); + } + + for (int i = 0; i < numThreads; i++) { + threads[i].join(); + } + + w.close(); + } + IndexReader open = IndexReader.open(dir); + assertEquals(1, open.numDocs()); + open.close(); + docs.close(); + dir.close(); + } + + static class IndexingThread extends Thread { + final LineFileDocs docs; + final IndexWriter writer; + final int num; + + + + public IndexingThread(LineFileDocs docs, IndexWriter writer, int num) { + super(); + this.docs = docs; + this.writer = writer; + this.num = num; + } + + + + public void run() { + try { + for (int i = 0; i < num; i++) { + Document doc = new Document();//docs.nextDoc(); + doc.add(newField("id", "test", Index.NOT_ANALYZED)); + writer.updateDocument(new Term("id", "test"), doc); +// if( random.nextInt(10) == 0) { +// IndexReader open = IndexReader.open(writer, true); +// assertEquals(1, open.numDocs()); +// open.close(); +// } + } + }catch (Exception e) { + fail(e.getMessage()); + } + } + } } Index: lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java (revision 1090383) +++ lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java (working copy) @@ -199,44 +199,39 @@ public Map indexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates, boolean doReaderPooling) throws IOException, InterruptedException { Map docs = new HashMap(); - for(int iter=0;iter<3;iter++) { - if (VERBOSE) { - System.out.println("TEST: iter=" + iter); - } - IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( - TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE) - .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates)) - .setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy())); - w.setInfoStream(VERBOSE ? System.out : null); - LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy(); - lmp.setUseCompoundFile(false); - lmp.setMergeFactor(mergeFactor); + IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( + TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE) + .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates)) + .setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy())); + w.setInfoStream(VERBOSE ? System.out : null); + LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy(); + lmp.setUseCompoundFile(false); + lmp.setMergeFactor(mergeFactor); - threads = new IndexingThread[nThreads]; - for (int i=0; i