Index: lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java	(working copy)
@@ -33,8 +33,8 @@
 import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.Weight;
 
-/* Tracks the stream of {@link BuffereDeletes}.
- * When DocumensWriter flushes, its buffered
+/* Tracks the stream of {@link BufferedDeletes}.
+ * When DocumentsWriterPerThread flushes, its buffered
  * deletes are appended to this stream.  We later
  * apply these deletes (resolve them to the actual
  * docIDs, per segment) when a merge is started
@@ -82,17 +82,27 @@
 
   // Appends a new packet of buffered deletes to the stream,
   // setting its generation:
-  public synchronized void push(FrozenBufferedDeletes packet) {
+  public synchronized long push(FrozenBufferedDeletes packet) {
+    /*
+     * The insert operation must be atomic. If we let threads increment the gen
+     * and push the packet afterwards we risk that packets are out of order.
+     * With DWPT this is possible if two or more flushes are racing for pushing
+     * updates. If the pushed packets get our of order would loose documents
+     * since deletes are applied to the wrong segments.
+     */
+    packet.setDelGen(nextGen++);
     assert packet.any();
     assert checkDeleteStats();
-    assert packet.gen < nextGen;
+    assert packet.delGen() < nextGen;
+    assert deletes.isEmpty() || deletes.get(deletes.size()-1).delGen() < packet.delGen() : "Delete packets must be in order";
     deletes.add(packet);
     numTerms.addAndGet(packet.numTermDeletes);
     bytesUsed.addAndGet(packet.bytesUsed);
     if (infoStream != null) {
-      message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
+      message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size());
     }
     assert checkDeleteStats();
+    return packet.delGen();
   }
 
   public synchronized void clear() {
@@ -132,7 +142,7 @@
   }
 
   // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
-  private static final Comparator<SegmentInfo> sortByDelGen = new Comparator<SegmentInfo>() {
+  private static final Comparator<SegmentInfo> sortSegInfoByDelGen = new Comparator<SegmentInfo>() {
     // @Override -- not until Java 1.6
     public int compare(SegmentInfo si1, SegmentInfo si2) {
       final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen();
@@ -147,10 +157,10 @@
 
     @Override
     public boolean equals(Object other) {
-      return sortByDelGen == other;
+      return sortSegInfoByDelGen == other;
     }
   };
-
+  
   /** Resolves the buffered deleted Term/Query/docIDs, into
    *  actual deleted docIDs in the deletedDocs BitVector for
    *  each SegmentReader. */
@@ -174,7 +184,7 @@
 
     SegmentInfos infos2 = new SegmentInfos();
     infos2.addAll(infos);
-    Collections.sort(infos2, sortByDelGen);
+    Collections.sort(infos2, sortSegInfoByDelGen);
 
     BufferedDeletes coalescedDeletes = null;
     boolean anyNewDeletes = false;
@@ -191,19 +201,30 @@
       final SegmentInfo info = infos2.get(infosIDX);
       final long segGen = info.getBufferedDeletesGen();
 
-      if (packet != null && segGen < packet.gen) {
+      if (packet != null && segGen < packet.delGen()) {
         //System.out.println("  coalesce");
         if (coalescedDeletes == null) {
           coalescedDeletes = new BufferedDeletes(true);
         }
-        coalescedDeletes.update(packet);
+        if (!packet.isSegmentPrivate) {
+          /*
+           * only update the coalescededDeletes if we are NOT on a segment private del packet. 
+           * the segment private del packet must only applied to segments with the same delGen. 
+           * Yet, if a segment is already deleted from the SI since it had no more documents remaining 
+           * after some del packets younger than it segPrivate packet (hihger delGen) have been applied
+           * the segPrivate packet has not been removed.
+           */
+          coalescedDeletes.update(packet);
+        }
+
         delIDX--;
-      } else if (packet != null && segGen == packet.gen) {
+      } else if (packet != null && segGen == packet.delGen()) {
+        assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet";
         //System.out.println("  eq");
 
         // Lock order: IW -> BD -> RP
         assert readerPool.infoIsLive(info);
-        SegmentReader reader = readerPool.get(info, false);
+        final SegmentReader reader = readerPool.get(info, false);
         int delCount = 0;
         final boolean segAllDeletes;
         try {
@@ -213,7 +234,7 @@
             delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
           }
           //System.out.println("    del exact");
-          // Don't delete by Term here; DocumentsWriter
+          // Don't delete by Term here; DocumentsWriterPerThread
           // already did that on flush:
           delCount += applyQueryDeletes(packet.queriesIterable(), reader);
           segAllDeletes = reader.numDocs() == 0;
@@ -236,7 +257,12 @@
         if (coalescedDeletes == null) {
           coalescedDeletes = new BufferedDeletes(true);
         }
-        coalescedDeletes.update(packet);
+        
+        /*
+         * since we are on a segment private del packet we must not
+         * update the coalescedDeletes here! We can simply advance to the 
+         * next packet and seginfo.
+         */
         delIDX--;
         infosIDX--;
         info.setBufferedDeletesGen(nextGen);
@@ -285,7 +311,7 @@
     return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted);
   }
 
-  public synchronized long getNextGen() {
+  synchronized long getNextGen() {
     return nextGen++;
   }
 
@@ -303,10 +329,9 @@
     if (infoStream != null) {
       message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size());
     }
-
     final int limit = deletes.size();
     for(int delIDX=0;delIDX<limit;delIDX++) {
-      if (deletes.get(delIDX).gen >= minGen) {
+      if (deletes.get(delIDX).delGen() >= minGen) {
         prune(delIDX);
         assert checkDeleteStats();
         return;
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(working copy)
@@ -21,7 +21,9 @@
 import java.io.PrintStream;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -117,8 +119,10 @@
   private AtomicInteger numDocsInRAM = new AtomicInteger(0);
 
   final BufferedDeletesStream bufferedDeletesStream;
-  // TODO: cutover to BytesRefHash
-  private final BufferedDeletes pendingDeletes = new BufferedDeletes(false);
+  // TODO: cut over to BytesRefHash in BufferedDeletes
+  volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
+  private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
+
   private Collection<String> abortedFiles;               // List of files that were written before last abort()
 
   final IndexingChain chain;
@@ -146,30 +150,12 @@
     
     healthiness = new Healthiness();
     final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
-    flushControl = new DocumentsWriterFlushControl(flushPolicy, perThreadPool, healthiness, pendingDeletes, maxRamPerDWPT);
+    flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
   }
 
-  boolean deleteQueries(final Query... queries) throws IOException {
-    synchronized(this) {
-      for (Query query : queries) {
-        pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
-      }
-    }
-
-    final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
-
-    while (threadsIterator.hasNext()) {
-      ThreadState state = threadsIterator.next();
-      state.lock();
-      try {
-        if (state.isActive()) {
-          state.perThread.deleteQueries(queries); 
-        }
-      } finally {
-        state.unlock();
-      }
-    }
-
+  synchronized boolean deleteQueries(final Query... queries) throws IOException {
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+    deleteQueue.addDelete(queries);
     return false;
   }
 
@@ -177,29 +163,12 @@
     return deleteQueries(query);
   }
 
-  boolean deleteTerms(final Term... terms) throws IOException {
-    synchronized(this) {
-      for (Term term : terms) {
-        pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
-      }
-    }
-
-    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
-
-    while (threadsIterator.hasNext()) {
-      ThreadState state = threadsIterator.next();
-      state.lock();
-      try {
-        if (state.isActive()) {
-          state.perThread.deleteTerms(terms);
-          flushControl.doOnDelete(state);
-        }
-      } finally {
-        state.unlock();
-      }
-    }
+  synchronized boolean deleteTerms(final Term... terms) throws IOException {
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+    deleteQueue.addDelete(terms);
+    flushControl.doOnDelete();
     if (flushControl.flushDeletes.getAndSet(false)) {
-      flushDeletes();
+      flushDeletes(deleteQueue);
     }
     return false;
   }
@@ -211,32 +180,20 @@
     return deleteTerms(term);
   }
 
-  void deleteTerm(final Term term, ThreadState exclude) throws IOException {
-    synchronized(this) {
-      pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
-    }
 
-    final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
-
-    while (threadsIterator.hasNext()) {
-      ThreadState state = threadsIterator.next();
-      if (state != exclude) {
-        state.lock();
-        try {
-          state.perThread.deleteTerms(term);
-          flushControl.doOnDelete(state);
-        } finally {
-          state.unlock();
-        }
-      }
-    }
-    if (flushControl.flushDeletes.getAndSet(false)) {
-      flushDeletes();
-    }
+  
+  DocumentsWriterDeleteQueue currentDeleteSession() {
+    return deleteQueue;
   }
-
-  private void flushDeletes() throws IOException {
-    maybePushPendingDeletes();
+  
+  private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+    if (deleteQueue != null) {
+      synchronized (ticketQueue) {
+        // freeze and insert the delete flush ticket in the queue
+        ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
+        applyFlushTickets(null, null);
+       }
+    }
     indexWriter.applyAllDeletes();
     indexWriter.flushCount.incrementAndGet();
   }
@@ -284,7 +241,7 @@
     boolean success = false;
 
     synchronized (this) {
-      pendingDeletes.clear();
+      deleteQueue.clear();
     }
 
     try {
@@ -322,17 +279,17 @@
   }
 
   public int getBufferedDeleteTermsSize() {
-    return pendingDeletes.terms.size();
+    return deleteQueue.getBufferedDeleteTermsSize();
   }
 
   //for testing
   public int getNumBufferedDeleteTerms() {
-    return pendingDeletes.numTermDeletes.get();
+    return deleteQueue.numGlobalTermDeletes();
   }
 
   public boolean anyDeletions() {
-    return pendingDeletes.any();
-    }
+    return deleteQueue.anyChanges();
+  }
 
   void close() {
     closed = true;
@@ -353,72 +310,132 @@
         message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment");
       }
       // try pick up pending threads here if possile
-      final DocumentsWriterPerThread flushingDWPT;
-      flushingDWPT = flushControl.getFlushIfPending(null);
+      DocumentsWriterPerThread flushingDWPT;
+      while ( (flushingDWPT = flushControl.nextPendingFlush()) != null){
        // don't push the delete here since the update could fail!
-      maybeMerge = doFlush(flushingDWPT);
+        maybeMerge = doFlush(flushingDWPT);
+        if (!healthiness.isStalled()) {
+          break;
+        }
+      }
       if (infoStream != null && healthiness.isStalled()) {
         message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore");
       }
       healthiness.waitIfStalled(); // block if stalled
     }
-    ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+    final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
         this, doc);
-    DocumentsWriterPerThread flushingDWPT = null;
+    final DocumentsWriterPerThread flushingDWPT;
+    final DocumentsWriterPerThread dwpt;
     try {
       if (!perThread.isActive()) {
         ensureOpen();
         assert false: "perThread is not active but we are still open";
       }
-      final DocumentsWriterPerThread dwpt = perThread.perThread;
+       
+      dwpt = perThread.perThread;
       try {
-        dwpt.updateDocument(doc, analyzer, delTerm);
+        dwpt.updateDocument(doc, analyzer, delTerm); 
+        numDocsInRAM.incrementAndGet();
       } finally {
         if(dwpt.checkAndResetHasAborted()) {
             flushControl.doOnAbort(perThread);
         }
       }
       flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
-      numDocsInRAM.incrementAndGet();
     } finally {
       perThread.unlock();
     }
-    // delete term from other DWPTs later, so that this thread
-    // doesn't have to lock multiple DWPTs at the same time
-    if (isUpdate) {
-      deleteTerm(delTerm, perThread);
-    }
+    
     maybeMerge |= doFlush(flushingDWPT);
     return maybeMerge;
   }
-  
- 
 
-  private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
+  private  boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
     boolean maybeMerge = false;
     while (flushingDWPT != null) {
       maybeMerge = true;
+      boolean success = false;
+      FlushTicket ticket = null;
       try {
+        /*
+         * Since with DWPT the flush process is concurrent and several DWPT
+         * could flush at the same time we must maintain the order of the
+         * flushes before we can apply the flushed segment and the frozen global
+         * deletes it is buffering. The reason for this is that the global
+         * deletes mark a certain point in time where we took a DWPT out of
+         * rotation and freeze the global deletes.
+         * 
+         * Example: A flush 'A' starts and freezes the global deletes, then
+         * flush 'B' starts and freezes all deletes occurred since 'A' has
+         * started. if 'B' finishes before 'A' we need to wait until 'A' is done
+         * otherwise the deletes frozen by 'B' are not applied to 'A' and we
+         * might miss to deletes documents in 'A'.
+         */
+        synchronized (ticketQueue) {
+         // each flush is assigned a ticket in the order they accquire the ticketQueue lock
+         ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
+         ticketQueue.add(ticket);
+        }
         // flush concurrently without locking
         final FlushedSegment newSegment = flushingDWPT.flush();
-        finishFlushedSegment(newSegment);
+        success = true;
+        /*
+         * now we are done and try to flush the ticket queue if the head of the
+         * queue has already finished the flush.
+         */
+        applyFlushTickets(ticket, newSegment);
       } finally {
           flushControl.doAfterFlush(flushingDWPT);
           flushingDWPT.checkAndResetHasAborted();
           indexWriter.flushCount.incrementAndGet();
+          if (!success && ticket != null) {
+            synchronized (ticketQueue) {
+            // in the case of a failure make sure we are making progress and
+            // apply all the deletes since the segment flush failed
+              ticket.isSegmentFlush = false;
+             
+            }
+          }
       }
-        flushingDWPT =  flushControl.nextPendingFlush() ;
+      flushingDWPT =  flushControl.nextPendingFlush() ;
     }
     return maybeMerge;
   }
   
 
-  private void finishFlushedSegment(FlushedSegment newSegment)
+  private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException {
+    synchronized (ticketQueue) {
+      if (current != null) {
+        // this is a segment FlushTicket so assign the flushed segment so we can make progress.
+        assert segment != null;
+        current.segment = segment;
+      }
+      while (true) {
+        // while we can publish flushes keep on making the queue empty.
+        final FlushTicket head = ticketQueue.peek();
+        if (head != null && head.canPublish()) {
+          ticketQueue.poll();
+          finishFlushedSegment(head.segment, head.frozenDeletes);
+        } else {
+          break;
+        }
+      }
+    }
+  }
+  
+
+  private void finishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
       throws IOException {
-    pushDeletes(newSegment);
-    if (newSegment != null) {
-      indexWriter.addFlushedSegment(newSegment);
+    // this is eventually finishing the flushed segment and publishing it to the IndexWriter
+    if (bufferedDeletes != null && bufferedDeletes.any()) {
+      bufferedDeletesStream.push(bufferedDeletes);
+      if (infoStream != null) {
+        message("flush: push buffered deletes: " + bufferedDeletes);
+      }
     }
+    publishFlushedSegment(newSegment);
+
   }
 
   final void subtractFlushedNumDocs(int numFlushed) {
@@ -427,183 +444,89 @@
       oldValue = numDocsInRAM.get();
     }
   }
-
-  private synchronized void pushDeletes(FlushedSegment flushedSegment) {
-    maybePushPendingDeletes();
-    if (flushedSegment != null) {
-      BufferedDeletes deletes = flushedSegment.segmentDeletes;
-      final long delGen = bufferedDeletesStream.getNextGen();
-      // Lock order: DW -> BD
+  
+  private void publishFlushedSegment(FlushedSegment newSegment)
+      throws IOException {
+    if (newSegment != null) {
+      final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
+      final BufferedDeletes deletes = newSegment.segmentDeletes;
+      FrozenBufferedDeletes packet = null;
       if (deletes != null && deletes.any()) {
-        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes,
-            delGen);
-        if (infoStream != null) {
-          message("flush: push buffered deletes");
-        }
-        bufferedDeletesStream.push(packet);
+        // segment private delete
+        packet = new FrozenBufferedDeletes(deletes, true);
         if (infoStream != null) {
-          message("flush: delGen=" + packet.gen);
+          message("flush: push buffered seg private deletes: " + packet);
         }
       }
-      flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
+      indexWriter.publishFlushedSegment(segInfo, packet);
     }
   }
-
-  private synchronized final void maybePushPendingDeletes() {
-    final long delGen = bufferedDeletesStream.getNextGen();
-    if (pendingDeletes.any()) {
-      indexWriter.bufferedDeletesStream.push(new FrozenBufferedDeletes(
-          pendingDeletes, delGen));
-      pendingDeletes.clear();
-    }
+  
+  private final Object flushAllLock = new Object();
+  // for asserts
+  private volatile DocumentsWriterDeleteQueue currentFlusingSession = null;
+  private boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
+    currentFlusingSession = session;
+    return true;
   }
-
+  
   final boolean flushAllThreads(final boolean flushDeletes)
     throws IOException {
-
-    final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
-    boolean anythingFlushed = false;
-
-    while (threadsIterator.hasNext()) {
-      final ThreadState perThread = threadsIterator.next();
-      final DocumentsWriterPerThread flushingDWPT;
-      /*
-       * TODO: maybe we can leverage incoming / indexing threads here if we mark
-       * all active threads pending so that we don't need to block until we got
-       * the handle. Yet, we need to figure out how to identify that a certain
-       * DWPT has been flushed since they are simply replaced once checked out
-       * for flushing. This would give us another level of concurrency during
-       * commit.
-       * 
-       * Maybe we simply iterate them and store the ThreadStates and mark
-       * all as flushPending and at the same time record the DWPT instance as a
-       * key for the pending ThreadState. This way we can easily iterate until
-       * all DWPT have changed.
-       */
-      perThread.lock(); 
+    synchronized (flushAllLock) {
+      final DocumentsWriterDeleteQueue flushingDeleteQueue;
+      synchronized (this) {
+        flushingDeleteQueue = deleteQueue;
+        deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
+        assert setFlushingDeleteQueue(flushingDeleteQueue);
+      }
+      assert flushingDeleteQueue == currentFlusingSession;
+      boolean anythingFlushed = false;
+      boolean success = false;
       try {
-        if (!perThread.isActive()) {
-          assert closed;
-          continue; //this perThread is already done maybe by a concurrently indexing thread
+        flushControl.markForFullFlush();
+        DocumentsWriterPerThread flushingDWPT;
+        // now try help out with flushing
+        while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+          anythingFlushed |= doFlush(flushingDWPT);
         }
-        final DocumentsWriterPerThread dwpt = perThread.perThread; 
-        // Always flush docs if there are any
-        final boolean flushDocs =  dwpt.getNumDocsInRAM() > 0;
-        final String segment = dwpt.getSegment();
-        // If we are flushing docs, segment must not be null:
-        assert segment != null || !flushDocs;
-        if (flushDocs) {
-          // check out and set pending if not already set
-          flushingDWPT = flushControl.tryCheckoutForFlush(perThread, true);
-          assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
-          assert dwpt == flushingDWPT : "flushControl returned different DWPT";
-          try {
-            final FlushedSegment newSegment = dwpt.flush();
-            anythingFlushed = true;
-            finishFlushedSegment(newSegment);
-          } finally {
-            flushControl.doAfterFlush(flushingDWPT);
-          }
+        // if a concurrent flush is still in flight wait for it
+        while (!flushControl.allFlushesDue()) {
+          flushControl.waitForFlush();  
         }
+        if (!anythingFlushed && flushDeletes) {
+          synchronized (ticketQueue) {
+            ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
+           }
+          applyFlushTickets(null, null);
+        }
+        success = true;
+        
       } finally {
-        perThread.unlock();
+        assert flushingDeleteQueue == currentFlusingSession;
+        assert setFlushingDeleteQueue(null);
+        if (!success) {
+          flushControl.abortFullFlushes();
+        } else {
+          // release the flush lock
+          flushControl.finishFullFlush();
+        }
       }
+      return anythingFlushed;
     }
-
-    if (!anythingFlushed && flushDeletes) {
-      maybePushPendingDeletes();
-    }
-
-
-    return anythingFlushed;
   }
   
-  
-  
- 
-
-//  /* We have three pools of RAM: Postings, byte blocks
-//   * (holds freq/prox posting data) and per-doc buffers
-//   * (stored fields/term vectors).  Different docs require
-//   * varying amount of storage from these classes.  For
-//   * example, docs with many unique single-occurrence short
-//   * terms will use up the Postings RAM and hardly any of
-//   * the other two.  Whereas docs with very large terms will
-//   * use alot of byte blocks RAM.  This method just frees
-//   * allocations from the pools once we are over-budget,
-//   * which balances the pools to match the current docs. */
-//  void balanceRAM() {
-//
-//    final boolean doBalance;
-//    final long deletesRAMUsed;
-//
-//    deletesRAMUsed = bufferedDeletes.bytesUsed();
-//
-//    synchronized(this) {
-//      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
-//        return;
-//      }
-//
-//      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
-//    }
-//
-//    if (doBalance) {
-//
-//      if (infoStream != null)
-//        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
-//                " vs trigger=" + toMB(ramBufferSize) +
-//                " deletesMB=" + toMB(deletesRAMUsed) +
-//                " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
-//                " perDocFree=" + toMB(perDocAllocator.bytesUsed()));
-//
-//      final long startBytesUsed = bytesUsed() + deletesRAMUsed;
-//
-//      int iter = 0;
-//
-//      // We free equally from each pool in 32 KB
-//      // chunks until we are below our threshold
-//      // (freeLevel)
-//
-//      boolean any = true;
-//
-//      while(bytesUsed()+deletesRAMUsed > freeLevel) {
-//
-//        synchronized(this) {
-//          if (0 == perDocAllocator.numBufferedBlocks() &&
-//              0 == byteBlockAllocator.numBufferedBlocks() &&
-//              0 == freeIntBlocks.size() && !any) {
-//            // Nothing else to free -- must flush now.
-//            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
-//            if (infoStream != null) {
-//              if (bytesUsed()+deletesRAMUsed > ramBufferSize)
-//                message("    nothing to free; set bufferIsFull");
-//              else
-//                message("    nothing to free");
-//            }
-//            break;
-//          }
-//
-//          if ((0 == iter % 4) && byteBlockAllocator.numBufferedBlocks() > 0) {
-//            byteBlockAllocator.freeBlocks(1);
-//          }
-//          if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
-//            freeIntBlocks.remove(freeIntBlocks.size()-1);
-//            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
-//          }
-//          if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
-//            perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
-//          }
-//        }
-//
-//        if ((3 == iter % 4) && any)
-//          // Ask consumer to free any recycled state
-//          any = consumer.freeRAM();
-//
-//        iter++;
-//      }
-//
-//      if (infoStream != null)
-//        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
-//    }
-//  }
+  static final class FlushTicket {
+    final FrozenBufferedDeletes frozenDeletes;
+    FlushedSegment segment;
+    boolean isSegmentFlush;
+    
+    FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
+      this.frozenDeletes = frozenDeletes;
+      this.isSegmentFlush = isSegmentFlush;
+    }
+    
+    boolean canPublish() {
+      return (!isSegmentFlush || segment != null);  
+    }
+  }
 }
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java	(revision 0)
@@ -0,0 +1,377 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.search.Query;
+
+/**
+ * {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
+ * queue. In contrast to other queue implementation we only maintain only the
+ * tail of the queue. A delete queue is always used in a context of a set of
+ * DWPT and a global delete pool. Each of the DWPT and the global pool need to
+ * maintain their 'own' head of the queue. The difference between the DWPT and
+ * the global pool is that the DWPT starts maintaining a head once it has added
+ * its first document since for its segments private deletes only the deletes
+ * after that document are relevant. The global pool instead starts maintaining
+ * the head once this instance is created by taking the sentinel instance as its
+ * initial head.
+ * <p>
+ * Since each {@link DeleteSlice} maintains its own head and the list is only
+ * single linked the garbage collector takes care of pruning the list for us.
+ * All nodes in the list that are still relevant should be either directly or
+ * indirectly referenced by one of the DWPT's private {@link DeleteSlice} or by
+ * the global {@link BufferedDeletes} slice.
+ * <p>
+ * Each DWPT as well as the global delete pool maintain their private
+ * DeleteSlice instance. In the DWPT case updating a slice is equivalent to
+ * atomically finishing the document. The slice update guarantees a happens
+ * before relationship to all other updates in the same indexing session. When a
+ * DWPT updates a document it
+ * 
+ * <ol>
+ * <li>consumes a document finishes its processing</li>
+ * <li>updates its private {@link DeleteSlice} either by calling
+ * {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the
+ * document has a delTerm)</li>
+ * <li>applies all deletes in the slice to its private {@link BufferedDeletes}
+ * and resets it</li>
+ * <li>increments its internal document id</li>
+ * </ol>
+ * 
+ * The DWPT also doesn't apply its current documents delete term until it has
+ * updated its delete slice which ensures the consistency of the update. if the
+ * update fails before the DeleteSlice could have been updated the deleteTerm
+ * will also not be added to its private deletes neither to the global deletes.
+ * 
+ */
+final class DocumentsWriterDeleteQueue {
+
+  private volatile Node tail;
+
+  private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue, Node> tailUpdater = AtomicReferenceFieldUpdater
+      .newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail");
+
+  private final DeleteSlice globalSlice;
+  private final BufferedDeletes globalBufferedDeletes;
+  /* only acquired to update the global deletes */
+  private final ReentrantLock globalBufferLock = new ReentrantLock();
+
+  DocumentsWriterDeleteQueue(BufferedDeletes globalBufferedDeletes) {
+    this.globalBufferedDeletes = globalBufferedDeletes;
+    /*
+     * we use a sentinel instance as our initial tail. No slice will ever try to
+     * apply this tail since the head is always omitted.
+     */
+    tail = new Node(null); // sentinel
+    globalSlice = new DeleteSlice(tail);
+  }
+
+  void addDelete(Query... queries) {
+    add(new QueryArrayNode(queries));
+    tryApplyGlobalSlice();
+  }
+
+  void addDelete(Term... terms) {
+    add(new TermArrayNode(terms));
+    tryApplyGlobalSlice();
+  }
+
+  /**
+   * invariant for document update
+   */
+  void add(Term term, DeleteSlice slice) {
+    final TermNode termNode = new TermNode(term);
+    add(termNode);
+    /*
+     * this is an update request where the term is the updated documents
+     * delTerm. in that case we need to guarantee that this insert is atomic
+     * with regards to the given delete slice. This means if two threads try to
+     * update the same document with in turn the same delTerm one of them must
+     * win. By taking the node we have created for our del term as the new tail
+     * it is guaranteed that if another thread adds the same right after us we
+     * will apply this delete next time we update our slice and one of the two
+     * competing updates wins!
+     */
+    slice.sliceTail = termNode;
+    assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
+    tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
+    // we can do it just every n times or so?
+  }
+
+  void add(Node item) {
+    /*
+     * this non-blocking / 'wait-free' linked list add was inspired by Apache
+     * Harmony's ConcurrentLinkedQueue Implementation.
+     */
+    while (true) {
+      final Node currentTail = this.tail;
+      final Node tailNext = currentTail.next;
+      if (tail == currentTail) {
+        if (tailNext != null) {
+          /*
+           * we are in intermediate state here. the tails next pointer has been
+           * advanced but the tail itself might not be updated yet. help to
+           * advance the tail and try again updating it.
+           */
+          tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
+        } else {
+          /*
+           * we are in quiescent state and can try to insert the item to the
+           * current tail if we fail to insert we just retry the operation since
+           * somebody else has already added its item
+           */
+          if (currentTail.casNext(null, item)) {
+            /*
+             * now that we are done we need to advance the tail while another
+             * thread could have advanced it already so we can ignore the return
+             * type of this CAS call
+             */
+            tailUpdater.compareAndSet(this, currentTail, item);
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  boolean anyChanges() {
+    globalBufferLock.lock();
+    try {
+      return !globalSlice.isEmpty() || globalBufferedDeletes.any();
+    } finally {
+      globalBufferLock.unlock();
+    }
+  }
+
+  void tryApplyGlobalSlice() {
+    if (globalBufferLock.tryLock()) {
+      /*
+       * the global buffer must be locked but we don't need to upate them if
+       * there is an update going on right now. It is sufficient to apply the
+       * deletes that have been added after the current in-flight global slices
+       * tail the next time we can get the lock!
+       */
+      try {
+        if (updateSlice(globalSlice)) {
+          globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
+
+        }
+      } finally {
+        globalBufferLock.unlock();
+      }
+    }
+  }
+
+  FrozenBufferedDeletes freezeGlobalBuffer(DeleteSlice callerSlice) {
+    globalBufferLock.lock();
+    /*
+     * here we are freezing the global buffer so we need to lock it, apply all
+     * deletes in the queue and reset the global slice to let the GC prune the
+     * queue.
+     */
+    final Node currentTail = tail; // take the current tail make this local any
+    // changes after this call are applied later
+    // and not relevant here
+    if (callerSlice != null) {
+      // update the callers slices so we are on the same page
+      callerSlice.sliceTail = currentTail;
+    }
+    try {
+      if (globalSlice.sliceTail != currentTail) {
+        globalSlice.sliceTail = currentTail;
+        globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
+      }
+
+      final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(
+          globalBufferedDeletes, false);
+      globalBufferedDeletes.clear();
+      return packet;
+    } finally {
+      globalBufferLock.unlock();
+    }
+  }
+
+  DeleteSlice newSlice() {
+    return new DeleteSlice(tail);
+  }
+
+  boolean updateSlice(DeleteSlice slice) {
+    if (slice.sliceTail != tail) { // if we are the same just
+      slice.sliceTail = tail;
+      return true;
+    }
+    return false;
+  }
+
+  static class DeleteSlice {
+    // no need to be volatile, slices are only access by one thread!
+    Node sliceHead; // we don't apply this one
+    Node sliceTail;
+
+    DeleteSlice(Node currentTail) {
+      assert currentTail != null;
+      /*
+       * Initially this is a 0 length slice pointing to the 'current' tail of
+       * the queue. Once we update the slice we only need to assign the tail and
+       * have a new slice
+       */
+      sliceHead = sliceTail = currentTail;
+    }
+
+    void apply(BufferedDeletes del, int docIDUpto) {
+      if (sliceHead == sliceTail) {
+        // 0 length slice
+        return;
+      }
+      /*
+       * when we apply a slice we take the head and get its next as our first
+       * item to apply and continue until we applied the tail. If the head and
+       * tail in this slice are not equal then there will be at least one more
+       * non-null node in the slice!
+       */
+      Node current = sliceHead;
+      do {
+        current = current.next;
+        assert current != null : "slice property violated between the head on the tail must not be a null node";
+        current.apply(del, docIDUpto);
+      } while (current != sliceTail);
+      reset();
+    }
+
+    void reset() {
+      // resetting to a 0 length slice
+      sliceHead = sliceTail;
+    }
+
+    /**
+     * Returns <code>true</code> iff the given item is identical to the item
+     * hold by the slices tail, otherwise <code>false</code>.
+     */
+    boolean isTailItem(Object item) {
+      return sliceTail.item == item;
+    }
+
+    boolean isEmpty() {
+      return sliceHead == sliceTail;
+    }
+  }
+
+  public int numGlobalTermDeletes() {
+    return globalBufferedDeletes.numTermDeletes.get();
+  }
+
+  void clear() {
+    globalBufferLock.lock();
+    try {
+      final Node currentTail = tail;
+      globalSlice.sliceHead = globalSlice.sliceTail = currentTail;
+      globalBufferedDeletes.clear();
+    } finally {
+      globalBufferLock.unlock();
+    }
+  }
+
+  private static class Node {
+    volatile Node next;
+    final Object item;
+
+    private Node(Object item) {
+      this.item = item;
+    }
+
+    static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater = AtomicReferenceFieldUpdater
+        .newUpdater(Node.class, Node.class, "next");
+
+    void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
+      assert false : "sentinel item must never be applied";
+    }
+
+    boolean casNext(Node cmp, Node val) {
+      return nextUpdater.compareAndSet(this, cmp, val);
+    }
+  }
+
+  private static final class TermNode extends Node {
+
+    TermNode(Term term) {
+      super(term);
+    }
+
+    @Override
+    void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
+      bufferedDeletes.addTerm((Term) item, docIDUpto);
+    }
+
+  }
+
+  private static final class QueryArrayNode extends Node {
+    QueryArrayNode(Query[] query) {
+      super(query);
+    }
+
+    @Override
+    void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
+      final Query[] queries = (Query[]) item;
+      for (Query query : queries) {
+        bufferedDeletes.addQuery(query, docIDUpto);  
+      }
+    }
+  }
+  
+  private static final class TermArrayNode extends Node {
+    TermArrayNode(Term[] term) {
+      super(term);
+    }
+
+    @Override
+    void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
+      final Term[] terms = (Term[]) item;
+      for (Term term : terms) {
+        bufferedDeletes.addTerm(term, docIDUpto);  
+      }
+    }
+  }
+
+
+  private boolean forceApplyGlobalSlice() {
+    globalBufferLock.lock();
+    final Node currentTail = tail;
+    try {
+      if (globalSlice.sliceTail != currentTail) {
+        globalSlice.sliceTail = currentTail;
+        globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
+      }
+      return globalBufferedDeletes.any();
+    } finally {
+      globalBufferLock.unlock();
+    }
+  }
+
+  public int getBufferedDeleteTermsSize() {
+    globalBufferLock.lock();
+    try {
+      forceApplyGlobalSlice();
+      return globalBufferedDeletes.terms.size();
+    } finally {
+      globalBufferLock.unlock();
+    }
+
+  }
+}
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java	(working copy)
@@ -16,11 +16,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
  * This class controls {@link DocumentsWriterPerThread} flushing during
@@ -42,6 +46,11 @@
   private volatile int numPending = 0;
   private volatile int numFlushing = 0;
   final AtomicBoolean flushDeletes = new AtomicBoolean(false);
+  private boolean fullFlush = false;
+  private Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
+  // only for safety reasons if a DWPT is close to the RAM limit
+  private Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<DocumentsWriterPerThread>();
+
 
   long peakActiveBytes = 0;// only with assert
   long peakFlushBytes = 0;// only with assert
@@ -51,16 +60,15 @@
   private final FlushPolicy flushPolicy;
   private boolean closed = false;
   private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
-  private final BufferedDeletes pendingDeletes;
+  private final DocumentsWriter documentsWriter;
 
-  DocumentsWriterFlushControl(FlushPolicy flushPolicy,
-      DocumentsWriterPerThreadPool threadPool, Healthiness healthiness,
-      BufferedDeletes pendingDeletes, long maxBytesPerDWPT) {
+  DocumentsWriterFlushControl(DocumentsWriter documentsWriter,
+      Healthiness healthiness, long maxBytesPerDWPT) {
     this.healthiness = healthiness;
-    this.perThreadPool = threadPool;
-    this.flushPolicy = flushPolicy;
+    this.perThreadPool = documentsWriter.perThreadPool;
+    this.flushPolicy = documentsWriter.flushPolicy;
     this.maxBytesPerDWPT = maxBytesPerDWPT;
-    this.pendingDeletes = pendingDeletes;
+    this.documentsWriter = documentsWriter;
   }
 
   public synchronized long activeBytes() {
@@ -113,6 +121,11 @@
         // is super
         // important since we can not address more than 2048 MB per DWPT
         setFlushPending(perThread);
+        if (fullFlush) {
+          DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false);
+          assert toBlock != null;
+          blockedFlushes.add(toBlock);
+        }
       }
     }
     final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread);
@@ -122,11 +135,29 @@
 
   synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
     assert flushingWriters.containsKey(dwpt);
-    numFlushing--;
-    Long bytes = flushingWriters.remove(dwpt);
-    flushBytes -= bytes.longValue();
-    perThreadPool.recycle(dwpt);
-    healthiness.updateStalled(this);
+    try {
+      numFlushing--;
+      Long bytes = flushingWriters.remove(dwpt);
+      flushBytes -= bytes.longValue();
+      perThreadPool.recycle(dwpt);
+      healthiness.updateStalled(this);
+    } finally {
+      notifyAll();
+    }
+  }
+  
+  public synchronized boolean allFlushesDue() {
+    return numFlushing == 0;
+  }
+  
+  public synchronized void waitForFlush() {
+    if (numFlushing != 0) {
+      try {
+        this.wait();
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      }
+    }
   }
 
   /**
@@ -157,6 +188,13 @@
 
   synchronized DocumentsWriterPerThread tryCheckoutForFlush(
       ThreadState perThread, boolean setPending) {
+    if (fullFlush)
+      return null;
+    return internalTryCheckOutForFlush(perThread, setPending);
+  }
+
+  private DocumentsWriterPerThread internalTryCheckOutForFlush(
+      ThreadState perThread, boolean setPending) {
     if (setPending && !perThread.flushPending) {
       setFlushPending(perThread);
     }
@@ -185,7 +223,7 @@
     return null;
   }
 
-  DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) {
+  private DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) {
     if (numPending > 0) {
       final DocumentsWriterPerThread dwpt = perThread == null ? null
           : tryCheckoutForFlush(perThread, false);
@@ -204,6 +242,12 @@
   }
 
   DocumentsWriterPerThread nextPendingFlush() {
+    synchronized (this) {
+      DocumentsWriterPerThread poll = flushQueue.poll();
+      if (poll != null) {
+        return poll;
+      }  
+    }
     if (numPending > 0) {
       final Iterator<ThreadState> allActiveThreads = perThreadPool
           .getActivePerThreadsIterator();
@@ -236,17 +280,16 @@
     return flushPolicy.getMaxNetBytes();
   }
 
-  synchronized void doOnDelete(ThreadState state) {
-    if (!state.flushPending) {
-      flushPolicy.onDelete(this, state);
-    }
+  synchronized void doOnDelete() {
+    // pass null this is a global delete no update
+    flushPolicy.onDelete(this, null);
   }
 
   /**
    * Returns the number of delete terms in the global pool
    */
   public int getNumGlobalTermDeletes() {
-    return pendingDeletes.numTermDeletes.get();
+    return documentsWriter.deleteQueue.numGlobalTermDeletes();
   }
 
   int numFlushingDWPT() {
@@ -260,4 +303,66 @@
   int numActiveDWPT() {
     return this.perThreadPool.getMaxThreadStates();
   }
+  
+  void markForFullFlush() {
+    synchronized (this) {
+      assert !fullFlush;
+      fullFlush = true;
+    }
+    final Iterator<ThreadState> allActiveThreads = perThreadPool
+    .getActivePerThreadsIterator();
+    final ArrayList<DocumentsWriterPerThread> toFlush = new ArrayList<DocumentsWriterPerThread>();
+    while (allActiveThreads.hasNext()) {
+      final ThreadState next = allActiveThreads.next();
+      next.lock();
+      try {
+        if (!next.isActive()) {
+          continue; 
+        }
+        if (next.perThread.getNumDocsInRAM() > 0) {
+          final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
+          final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next, true);
+          assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+          assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+          toFlush.add(flushingDWPT);
+        } else {
+          next.perThread.initialize();
+        }
+      } finally {
+        next.unlock();
+      }
+    }
+    synchronized (this) {
+      flushQueue.addAll(blockedFlushes);
+      blockedFlushes.clear();
+      flushQueue.addAll(toFlush);
+    }
+    
+  }
+  
+  synchronized void finishFullFlush() {
+    assert fullFlush;
+    assert flushQueue.isEmpty();
+    try {
+      if (!blockedFlushes.isEmpty()) {
+        flushQueue.addAll(blockedFlushes);
+        blockedFlushes.clear();
+      }
+    } finally {
+      fullFlush = false;
+    }
+  }
+
+  synchronized void abortFullFlushes() {
+    try {
+      for (DocumentsWriterPerThread dwpt : flushQueue) {
+        doAfterFlush(dwpt);
+      }
+      for (DocumentsWriterPerThread dwpt : blockedFlushes) {
+        doAfterFlush(dwpt);
+      }
+    } finally {
+      fullFlush = false;
+    }
+  }
 }
\ No newline at end of file
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(working copy)
@@ -27,7 +27,7 @@
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.search.Query;
+import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BitVector;
@@ -136,6 +136,7 @@
       }
 
       pendingDeletes.clear();
+      deleteSlice = deleteQueue.newSlice();
       // Reset all postings data
       doAfterFlush();
 
@@ -165,6 +166,8 @@
   private final PrintStream infoStream;
   private int numDocsInRAM;
   private int flushedDocCount;
+  DocumentsWriterDeleteQueue deleteQueue;
+  DeleteSlice deleteSlice;
   
   public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
       FieldInfos fieldInfos, IndexingChain indexingChain) {
@@ -180,11 +183,19 @@
     consumer = indexingChain.getChain(this);
     bytesUsed = new AtomicLong(0);
     pendingDeletes = new BufferedDeletes(false);
+    initialize();
   }
   
   public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) {
     this(other.directory, other.parent, fieldInfos, other.parent.chain);
-    
+  }
+  
+  void initialize() {
+    deleteQueue = parent.deleteQueue;
+    assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
+    pendingDeletes.clear();
+    deleteSlice = null;
+      
   }
 
   void setAborting() {
@@ -199,13 +210,10 @@
 
   public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
     assert writer.testPoint("DocumentsWriterPerThread addDocument start");
+    assert deleteQueue != null;
     docState.doc = doc;
     docState.analyzer = analyzer;
     docState.docID = numDocsInRAM;
-    if (delTerm != null) {
-      pendingDeletes.addTerm(delTerm, numDocsInRAM);
-    }
-
     if (segment == null) {
       // this call is synchronized on IndexWriter.segmentInfos
       segment = writer.newSegmentName();
@@ -219,7 +227,6 @@
       } finally {
         docState.clear();
       }
-
       success = true;
     } finally {
       if (!success) {
@@ -232,18 +239,44 @@
         }
       }
     }
-
     success = false;
     try {
-      numDocsInRAM++;
       consumer.finishDocument();
-
       success = true;
     } finally {
       if (!success) {
         abort();
       }
     }
+    finishDocument(delTerm);
+  }
+  
+  private void finishDocument(Term delTerm) throws IOException {
+    /*
+     * here we actually finish the document in two steps 1. push the delete into
+     * the queue and update our slice. 2. increment the DWPT private document
+     * id.
+     * 
+     * the updated slice we get from 1. holds all the deletes that have occurred
+     * since we updated the slice the last time.
+     */
+    if (deleteSlice == null) {
+      deleteSlice = deleteQueue.newSlice();
+      if (delTerm != null) {
+        deleteQueue.add(delTerm, deleteSlice);
+        deleteSlice.reset();
+      }
+      
+    } else {
+      if (delTerm != null) {
+        deleteQueue.add(delTerm, deleteSlice);
+        assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
+        deleteSlice.apply(pendingDeletes, numDocsInRAM);
+      } else if (deleteQueue.updateSlice(deleteSlice)) {
+        deleteSlice.apply(pendingDeletes, numDocsInRAM);
+      }
+    }
+    ++numDocsInRAM;
   }
 
   // Buffer a specific docID for deletion.  Currently only
@@ -261,22 +294,6 @@
     // confounding exception).
   }
 
-  void deleteQueries(Query... queries) {
-    if (numDocsInRAM > 0) {
-      for (Query query : queries) {
-        pendingDeletes.addQuery(query, numDocsInRAM);
-      }
-    }
-  }
-
-  void deleteTerms(Term... terms) {
-    if (numDocsInRAM > 0) {
-      for (Term term : terms) {
-        pendingDeletes.addTerm(term, numDocsInRAM);
-      }
-    }
-  }
-  
   /**
    * Returns the number of delete terms in this {@link DocumentsWriterPerThread}
    */
@@ -305,6 +322,15 @@
     parent.subtractFlushedNumDocs(numDocsInRAM);
     numDocsInRAM = 0;
   }
+  
+  FrozenBufferedDeletes prepareFlush() {
+    assert numDocsInRAM > 0;
+    final FrozenBufferedDeletes globalDeletes = deleteQueue.freezeGlobalBuffer(deleteSlice);
+    // apply all deletes before we flush and release the delete slice
+    deleteSlice.apply(pendingDeletes, numDocsInRAM);
+    deleteSlice = null;
+    return globalDeletes;
+  }
 
   /** Flush all pending docs to a new segment */
   FlushedSegment flush() throws IOException {
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java	(working copy)
@@ -118,7 +118,10 @@
 
   public synchronized ThreadState newThreadState() {
     if (numThreadStatesActive < perThreads.length) {
-      return perThreads[numThreadStatesActive++];
+      final ThreadState threadState = perThreads[numThreadStatesActive];
+      threadState.perThread.initialize();
+      numThreadStatesActive++;
+      return threadState;
     }
     return null;
   }
@@ -128,7 +131,9 @@
     final DocumentsWriterPerThread dwpt = threadState.perThread;
     if (!closed) {
       final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
-      threadState.resetWriter(new DocumentsWriterPerThread(dwpt, infos));
+      final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
+      newDwpt.initialize();
+      threadState.resetWriter(newDwpt);
     } else {
       threadState.resetWriter(null);
     }
Index: lucene/src/java/org/apache/lucene/index/FlushPolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/FlushPolicy.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/FlushPolicy.java	(working copy)
@@ -55,8 +55,8 @@
   protected IndexWriterConfig indexWriterConfig;
 
   /**
-   * Called for each delete term applied to the given {@link ThreadState}s
-   * {@link DocumentsWriterPerThread}.
+   * Called for each delete term. If this is a delete triggered due to an update
+   * the given {@link ThreadState} is non-null.
    * <p>
    * Note: This method is synchronized by the given
    * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
Index: lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java	(revision 1088598)
+++ lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java	(working copy)
@@ -52,9 +52,15 @@
   final int[] queryLimits;
   final int bytesUsed;
   final int numTermDeletes;
-  final long gen;
+  private long gen = -1; // assigned by BufferedDeletesStream once pushed
+  final boolean isSegmentPrivate;  // set to true iff this frozen packet represents 
+                                   // a segment private deletes. in that case is should
+                                   // only have Queries 
+
 
-  public FrozenBufferedDeletes(BufferedDeletes deletes, long gen) {
+  public FrozenBufferedDeletes(BufferedDeletes deletes, boolean isSegmentPrivate) {
+    this.isSegmentPrivate = isSegmentPrivate;
+    assert !isSegmentPrivate || deletes.terms.size() == 0 : "segment private package should only have del queries"; 
     terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]);
     queries = new Query[deletes.queries.size()];
     queryLimits = new int[deletes.queries.size()];
@@ -66,8 +72,17 @@
     }
     bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY;
     numTermDeletes = deletes.numTermDeletes.get();
+  }
+  
+  public void setDelGen(long gen) {
+    assert this.gen == -1;
     this.gen = gen;
   }
+  
+  public long delGen() {
+    assert gen != -1;
+    return gen;
+  }
 
   public Iterable<Term> termsIterable() {
     return new Iterable<Term>() {
Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java	(revision 1088606)
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java	(working copy)
@@ -47,6 +47,7 @@
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.BitVector;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -356,8 +357,8 @@
     // reader; in theory we could do similar retry logic,
     // just like we do when loading segments_N
     IndexReader r;
+    flush(false, applyAllDeletes); // don't sync on IW here DWPT will deadlock
     synchronized(this) {
-      flush(false, applyAllDeletes);
       r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
       if (infoStream != null) {
         message("return reader version=" + r.getVersion() + " reader=" + r);
@@ -2026,7 +2027,16 @@
     deleter.checkpoint(segmentInfos, false);
   }
 
-  void addFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+  /**
+   * Prepares the {@link SegmentInfo} for the new flushed segment and persists
+   * the deleted documents {@link BitVector}. Use
+   * {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to
+   * publish the returned {@link SegmentInfo} together with its segment private
+   * delete packet.
+   * 
+   * @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)
+   */
+  SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
     assert flushedSegment != null;
 
     SegmentInfo newSegment = flushedSegment.segmentInfo;
@@ -2098,9 +2108,32 @@
         }
       }
     }
-
-
-    synchronized(this) {
+    return newSegment;
+  }
+  
+  /**
+   * Atomically adds the segment private delete packet and publishes the flushed
+   * segments SegmentInfo to the index writer. NOTE: use
+   * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
+   * {@link SegmentInfo} for the flushed segment.
+   * 
+   * @see #prepareFlushedSegment(FlushedSegment)
+   */
+  synchronized void publishFlushedSegment(SegmentInfo newSegment,
+      FrozenBufferedDeletes packet) throws IOException {
+    // lock order IW -> BDS
+    synchronized (bufferedDeletesStream) {
+      // publishing the segment must be synched on IW -> BDS to make the sure
+      // that no merge prunes away the seg. private delete packet
+      final long nextGen;
+      if (packet != null && packet.any()) {
+        nextGen = bufferedDeletesStream.push(packet);
+      } else {
+        // since we don't have a delete packet to apply we can get a new
+        // generation right away
+        nextGen = bufferedDeletesStream.getNextGen();
+      }
+      newSegment.setBufferedDeletesGen(nextGen);
       segmentInfos.add(newSegment);
       checkpoint();
     }
@@ -2534,7 +2567,6 @@
       maybeMerge();
     }
   }
-
   // TODO: this method should not have to be entirely
   // synchronized, ie, merges should be allowed to commit
   // even while a flush is happening
@@ -2602,30 +2634,31 @@
   }
   
   final synchronized void applyAllDeletes() throws IOException {
-    flushDeletesCount.incrementAndGet();
-    final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
-    if (result.anyDeletes) {
-      checkpoint();
-    }
-    if (!keepFullyDeletedSegments && result.allDeleted != null) {
-      if (infoStream != null) {
-        message("drop 100% deleted segments: " + result.allDeleted);
+      flushDeletesCount.incrementAndGet();
+      final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
+          .applyDeletes(readerPool, segmentInfos);
+      if (result.anyDeletes) {
+        checkpoint();
       }
-      for(SegmentInfo info : result.allDeleted) {
-        // If a merge has already registered for this
-        // segment, we leave it in the readerPool; the
-        // merge will skip merging it and will then drop
-        // it once it's done:
-        if (!mergingSegments.contains(info)) {
-          segmentInfos.remove(info);
-          if (readerPool != null) {
-            readerPool.drop(info);
+      if (!keepFullyDeletedSegments && result.allDeleted != null) {
+        if (infoStream != null) {
+          message("drop 100% deleted segments: " + result.allDeleted);
+        }
+        for (SegmentInfo info : result.allDeleted) {
+          // If a merge has already registered for this
+          // segment, we leave it in the readerPool; the
+          // merge will skip merging it and will then drop
+          // it once it's done:
+          if (!mergingSegments.contains(info)) {
+            segmentInfos.remove(info);
+            if (readerPool != null) {
+              readerPool.drop(info);
+            }
           }
         }
+        checkpoint();
       }
-      checkpoint();
-    }
-    bufferedDeletesStream.prune(segmentInfos);
+      bufferedDeletesStream.prune(segmentInfos);
   }
 
   /** Expert:  Return the total size of all index files currently cached in memory.
@@ -3065,7 +3098,6 @@
 
     // Lock order: IW -> BD
     bufferedDeletesStream.prune(segmentInfos);
-
     Map<String,String> details = new HashMap<String,String>();
     details.put("optimize", Boolean.toString(merge.optimize));
     details.put("mergeFactor", Integer.toString(merge.segments.size()));
Index: lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java	(revision 1088606)
+++ lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java	(working copy)
@@ -19,6 +19,7 @@
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.*;
+import org.apache.lucene.document.Field.Index;
 import org.apache.lucene.store.*;
 import org.apache.lucene.util.*;
 import org.junit.Test;
@@ -72,4 +73,73 @@
     
     dir.close();
   }
+  
+  
+  public void testUpdateSameDoc() throws Exception {
+    final Directory dir = newDirectory();
+
+    final LineFileDocs docs = new LineFileDocs(random);
+    for (int r = 0; r < 3; r++) {
+      final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(
+          TEST_VERSION_CURRENT, new MockAnalyzer()).setMergePolicy(
+          newLogMergePolicy()).setMaxBufferedDocs(2));
+
+      final int SIZE = 200 * RANDOM_MULTIPLIER;
+      final int numUpdates = (int) (SIZE * (2 + random.nextDouble()));
+      int numThreads = 3 + random.nextInt(Runtime.getRuntime().availableProcessors());
+      IndexingThread[] threads = new IndexingThread[numThreads];
+      for (int i = 0; i < numThreads; i++) {
+        threads[i] = new IndexingThread(docs, w, numUpdates);
+        threads[i].start();
+      }
+
+      for (int i = 0; i < numThreads; i++) {
+        threads[i].join();
+      }
+
+      w.close();
+    }
+    IndexReader open = IndexReader.open(dir);
+    assertEquals(1, open.numDocs());
+    open.close();
+    docs.close();
+    dir.close();
+  }
+  
+  static class IndexingThread extends Thread {
+    final LineFileDocs docs;
+    final IndexWriter writer;
+    final int num;
+    
+    public IndexingThread(LineFileDocs docs, IndexWriter writer, int num) {
+      super();
+      this.docs = docs;
+      this.writer = writer;
+      this.num = num;
+    }
+
+    public void run() {
+      try {
+//        IndexReader open = IndexReader.open(writer, true);
+        for (int i = 0; i < num; i++) {
+          Document doc = new Document();// docs.nextDoc();
+          doc.add(newField("id", "test", Index.NOT_ANALYZED));
+          writer.updateDocument(new Term("id", "test"), doc);
+//          if (random.nextInt(10) == 0) {
+//            IndexReader reader = open.reopen();
+//            if (reader != open) {
+//              open.close();
+//              open = reader;
+//            }
+//            assertEquals("iter: " + i + " numDocs: "+ open.numDocs() + " del: " + open.numDeletedDocs() + " max: " + open.maxDoc(), 1, open.numDocs());
+//            
+//          }
+        }
+//        open.close();
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+      
+    }
+  }
 }
Index: lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java	(revision 1088598)
+++ lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java	(working copy)
@@ -199,44 +199,39 @@
   public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates,
                                           boolean doReaderPooling) throws IOException, InterruptedException {
     Map<String,Document> docs = new HashMap<String,Document>();
-    for(int iter=0;iter<3;iter++) {
-      if (VERBOSE) {
-        System.out.println("TEST: iter=" + iter);
-      }
-      IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig(
-          TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE)
-               .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates))
-               .setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy()));
-      w.setInfoStream(VERBOSE ? System.out : null);
-      LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
-      lmp.setUseCompoundFile(false);
-      lmp.setMergeFactor(mergeFactor);
+    IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig(
+        TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE)
+             .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates))
+             .setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy()));
+    w.setInfoStream(VERBOSE ? System.out : null);
+    LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
+    lmp.setUseCompoundFile(false);
+    lmp.setMergeFactor(mergeFactor);
 
-      threads = new IndexingThread[nThreads];
-      for (int i=0; i<threads.length; i++) {
-        IndexingThread th = new IndexingThread();
-        th.w = w;
-        th.base = 1000000*i;
-        th.range = range;
-        th.iterations = iterations;
-        threads[i] = th;
-      }
+    threads = new IndexingThread[nThreads];
+    for (int i=0; i<threads.length; i++) {
+      IndexingThread th = new IndexingThread();
+      th.w = w;
+      th.base = 1000000*i;
+      th.range = range;
+      th.iterations = iterations;
+      threads[i] = th;
+    }
 
-      for (int i=0; i<threads.length; i++) {
-        threads[i].start();
-      }
-      for (int i=0; i<threads.length; i++) {
-        threads[i].join();
-      }
+    for (int i=0; i<threads.length; i++) {
+      threads[i].start();
+    }
+    for (int i=0; i<threads.length; i++) {
+      threads[i].join();
+    }
 
-      //w.optimize();
-      w.close();    
+    //w.optimize();
+    w.close();    
 
-      for (int i=0; i<threads.length; i++) {
-        IndexingThread th = threads[i];
-        synchronized(th) {
-          docs.putAll(th.docs);
-        }
+    for (int i=0; i<threads.length; i++) {
+      IndexingThread th = threads[i];
+      synchronized(th) {
+        docs.putAll(th.docs);
       }
     }
 
