Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(revision 1091720)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(working copy)
@@ -118,9 +118,8 @@
 
   private AtomicInteger numDocsInRAM = new AtomicInteger(0);
 
-  final BufferedDeletesStream bufferedDeletesStream;
   // TODO: cut over to BytesRefHash in BufferedDeletes
-  volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
+  volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
   private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
 
   private Collection<String> abortedFiles;               // List of files that were written before last abort()
@@ -136,7 +135,6 @@
     this.directory = directory;
     this.indexWriter = writer;
     this.similarityProvider = config.getSimilarityProvider();
-    this.bufferedDeletesStream = bufferedDeletesStream;
     this.perThreadPool = config.getIndexerThreadPool();
     this.chain = config.getIndexingChain();
     this.perThreadPool.initialize(this, globalFieldNumbers, config);
@@ -179,8 +177,6 @@
   boolean deleteTerm(final Term term) throws IOException {
     return deleteTerms(term);
   }
-
-
   
   DocumentsWriterDeleteQueue currentDeleteSession() {
     return deleteQueue;
@@ -357,7 +353,12 @@
       maybeMerge = true;
       boolean success = false;
       FlushTicket ticket = null;
+      
       try {
+        assert currentFullFlushDelQueue == null
+            || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
+            + currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
+            + " " + flushControl.isFullFlush();
         /*
          * Since with DWPT the flush process is concurrent and several DWPT
          * could flush at the same time we must maintain the order of the
@@ -391,8 +392,8 @@
           indexWriter.flushCount.incrementAndGet();
           if (!success && ticket != null) {
             synchronized (ticketQueue) {
-            // in the case of a failure make sure we are making progress and
-            // apply all the deletes since the segment flush failed
+              // in the case of a failure make sure we are making progress and
+              // apply all the deletes since the segment flush failed
               ticket.isSegmentFlush = false;
              
             }
@@ -416,7 +417,7 @@
         final FlushTicket head = ticketQueue.peek();
         if (head != null && head.canPublish()) {
           ticketQueue.poll();
-          finishFlushedSegment(head.segment, head.frozenDeletes);
+          finishFlush(head.segment, head.frozenDeletes);
         } else {
           break;
         }
@@ -425,16 +426,22 @@
   }
   
 
-  private void finishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+  private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
       throws IOException {
     // this is eventually finishing the flushed segment and publishing it to the IndexWriter
-    if (bufferedDeletes != null && bufferedDeletes.any()) {
-      bufferedDeletesStream.push(bufferedDeletes);
-      if (infoStream != null) {
-        message("flush: push buffered deletes: " + bufferedDeletes);
+    if (newSegment == null) {
+      assert bufferedDeletes != null;
+      if (bufferedDeletes != null && bufferedDeletes.any()) {
+        indexWriter.bufferedDeletesStream.push(bufferedDeletes);
+        if (infoStream != null) {
+          message("flush: push buffered deletes: " + bufferedDeletes);
+        }
       }
+    } else {
+      publishFlushedSegment(newSegment, bufferedDeletes);  
     }
-    publishFlushedSegment(newSegment);
+    
+    
 
   }
 
@@ -445,74 +452,89 @@
     }
   }
   
-  private void publishFlushedSegment(FlushedSegment newSegment)
+  /**
+   * publishes the flushed segment, segment private deletes if any and its
+   * associated global delete if present to the index writer. the actual
+   * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}
+   * 's delete generation is always GlobalPacket_deleteGeneration + 1
+   */
+  private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
       throws IOException {
-    if (newSegment != null) {
-      final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
-      final BufferedDeletes deletes = newSegment.segmentDeletes;
-      FrozenBufferedDeletes packet = null;
-      if (deletes != null && deletes.any()) {
-        // segment private delete
-        packet = new FrozenBufferedDeletes(deletes, true);
-        if (infoStream != null) {
-          message("flush: push buffered seg private deletes: " + packet);
-        }
+    assert newSegment != null;
+    final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
+    final BufferedDeletes deletes = newSegment.segmentDeletes;
+    FrozenBufferedDeletes packet = null;
+    if (deletes != null && deletes.any()) {
+      // segment private delete
+      packet = new FrozenBufferedDeletes(deletes, true);
+      if (infoStream != null) {
+        message("flush: push buffered seg private deletes: " + packet);
       }
-      indexWriter.publishFlushedSegment(segInfo, packet);
     }
+    // now publish!
+    indexWriter.publishFlushedSegment(segInfo, packet, globalPacket);
   }
   
-  private final Object flushAllLock = new Object();
   // for asserts
-  private volatile DocumentsWriterDeleteQueue currentFlusingSession = null;
-  private boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
-    currentFlusingSession = session;
+  private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
+  // for asserts
+  private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
+    currentFullFlushDelQueue = session;
     return true;
   }
   
+  /*
+   * flushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
+   * two stage operations, the caller must ensure that #finishFlush is called
+   * after this method to release the flush lock in DWFlushControl - use try /
+   * finally!
+   */
   final boolean flushAllThreads(final boolean flushDeletes)
     throws IOException {
-    synchronized (flushAllLock) {
-      final DocumentsWriterDeleteQueue flushingDeleteQueue;
-      synchronized (this) {
-        flushingDeleteQueue = deleteQueue;
-        deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
-        assert setFlushingDeleteQueue(flushingDeleteQueue);
+    
+    synchronized (this) {
+      final DocumentsWriterDeleteQueue flushingDeleteQueue = deleteQueue;
+      /* sets a new delete queue - this must be synced on the flush control
+       * otherwise a new DWPT could sneak into the loop with an already flushing
+       * delete queue */
+      flushControl.markForFullFlush();
+      assert setFlushingDeleteQueue(flushingDeleteQueue);
+    }
+    assert currentFullFlushDelQueue != null;
+    assert currentFullFlushDelQueue != deleteQueue;
+    
+    final DocumentsWriterDeleteQueue flushingDeleteQueue = currentFullFlushDelQueue;
+    boolean anythingFlushed = false;
+    try {
+      DocumentsWriterPerThread flushingDWPT;
+      // now try help out with flushing
+      while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+        anythingFlushed |= doFlush(flushingDWPT);
       }
-      assert flushingDeleteQueue == currentFlusingSession;
-      boolean anythingFlushed = false;
-      boolean success = false;
-      try {
-        flushControl.markForFullFlush();
-        DocumentsWriterPerThread flushingDWPT;
-        // now try help out with flushing
-        while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
-          anythingFlushed |= doFlush(flushingDWPT);
-        }
-        // if a concurrent flush is still in flight wait for it
-        while (!flushControl.allFlushesDue()) {
-          flushControl.waitForFlush();  
-        }
-        if (!anythingFlushed && flushDeletes) {
-          synchronized (ticketQueue) {
-            ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
-           }
-          applyFlushTickets(null, null);
-        }
-        success = true;
-        
-      } finally {
-        assert flushingDeleteQueue == currentFlusingSession;
-        assert setFlushingDeleteQueue(null);
-        if (!success) {
-          flushControl.abortFullFlushes();
-        } else {
-          // release the flush lock
-          flushControl.finishFullFlush();
-        }
+      // if a concurrent flush is still in flight wait for it
+      while (!flushControl.allFlushesDue()) {
+        flushControl.waitForFlush();  
       }
-      return anythingFlushed;
+      if (!anythingFlushed && flushDeletes) {
+        synchronized (ticketQueue) {
+          ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
+         }
+        applyFlushTickets(null, null);
+      }
+    } finally {
+      assert flushingDeleteQueue == currentFullFlushDelQueue;
+    }
+    return anythingFlushed;
+  }
+  
+  final void finishFullFlush(boolean success) {
+    if (success) {
+      // release the flush lock
+      flushControl.finishFullFlush();
+    } else {
+      flushControl.abortFullFlushes();
     }
+    assert setFlushingDeleteQueue(null);
   }
   
   static final class FlushTicket {
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java	(revision 1091720)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java	(working copy)
@@ -72,6 +72,10 @@
   private final BufferedDeletes globalBufferedDeletes;
   /* only acquired to update the global deletes */
   private final ReentrantLock globalBufferLock = new ReentrantLock();
+  
+  DocumentsWriterDeleteQueue() {
+    this(new BufferedDeletes(false));
+  }
 
   DocumentsWriterDeleteQueue(BufferedDeletes globalBufferedDeletes) {
     this.globalBufferedDeletes = globalBufferedDeletes;
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java	(revision 1091720)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java	(working copy)
@@ -305,9 +305,14 @@
   }
   
   void markForFullFlush() {
+    final DocumentsWriterDeleteQueue flushingQueue;
     synchronized (this) {
       assert !fullFlush;
       fullFlush = true;
+      flushingQueue = documentsWriter.deleteQueue;
+      // set a new delete queue - all subsequent DWPT will use this queue until
+      // we do another full flush
+      documentsWriter.deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
     }
     final Iterator<ThreadState> allActiveThreads = perThreadPool
     .getActivePerThreadsIterator();
@@ -319,13 +324,18 @@
         if (!next.isActive()) {
           continue; 
         }
-        if (next.perThread.getNumDocsInRAM() > 0) {
+        if (next.perThread.deleteQueue != flushingQueue) {
+          // this one is already a new DWPT
+          continue;
+        }
+        if (next.perThread.getNumDocsInRAM() > 0 ) {
           final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
           final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next, true);
           assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
           assert dwpt == flushingDWPT : "flushControl returned different DWPT";
           toFlush.add(flushingDWPT);
         } else {
+          // get the new delete queue from DW
           next.perThread.initialize();
         }
       } finally {
@@ -337,7 +347,6 @@
       blockedFlushes.clear();
       flushQueue.addAll(toFlush);
     }
-    
   }
   
   synchronized void finishFullFlush() {
@@ -361,8 +370,15 @@
       for (DocumentsWriterPerThread dwpt : blockedFlushes) {
         doAfterFlush(dwpt);
       }
+      
     } finally {
+      flushQueue.clear();
+      blockedFlushes.clear();
       fullFlush = false;
     }
   }
+  
+  synchronized boolean isFullFlush() {
+    return fullFlush;
+  }
 }
\ No newline at end of file
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(revision 1091720)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(working copy)
@@ -195,7 +195,6 @@
     assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
     pendingDeletes.clear();
     deleteSlice = null;
-      
   }
 
   void setAborting() {
Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java	(revision 1091720)
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java	(working copy)
@@ -347,25 +347,50 @@
     if (infoStream != null) {
       message("flush at getReader");
     }
-
     // Do this up front before flushing so that the readers
     // obtained during this flush are pooled, the first time
     // this method is called:
     poolReaders = true;
-
-    // Prevent segmentInfos from changing while opening the
-    // reader; in theory we could do similar retry logic,
-    // just like we do when loading segments_N
-    IndexReader r;
-    flush(false, applyAllDeletes); // don't sync on IW here DWPT will deadlock
-    synchronized(this) {
-      r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
-      if (infoStream != null) {
-        message("return reader version=" + r.getVersion() + " reader=" + r);
+    final IndexReader r;
+    doBeforeFlush();
+    final boolean maybeMerge;
+    /*
+     * for releasing a NRT reader we must ensure that 
+     * DW doesn't add any segments or deletes until we are
+     * done with creating the NRT DirectoryReader. 
+     * We release the two stage full flush after we are done opening the
+     * directory reader!
+     */
+    synchronized (fullFlushLock) {
+      boolean success = false;
+      try {
+        maybeMerge = docWriter.flushAllThreads(applyAllDeletes);
+        if (!maybeMerge) {
+          flushCount.incrementAndGet();
+        }
+        success = true;
+        // Prevent segmentInfos from changing while opening the
+        // reader; in theory we could do similar retry logic,
+        // just like we do when loading segments_N
+        synchronized(this) {
+          maybeApplyDeletes(applyAllDeletes);
+          r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+          if (infoStream != null) {
+            message("return reader version=" + r.getVersion() + " reader=" + r);
+          }
+        }
+      } finally {
+        if (!success && infoStream != null) {
+          message("hit exception during while NRT reader");
+        }
+        // now we are done - finish the full flush!
+        docWriter.finishFullFlush(success);
+        doAfterFlush();
       }
     }
-    maybeMerge();
-
+    if(maybeMerge) {
+      maybeMerge();
+    }
     if (infoStream != null) {
       message("getReader took " + (System.currentTimeMillis() - tStart) + " msec");
     }
@@ -2120,9 +2145,12 @@
    * @see #prepareFlushedSegment(FlushedSegment)
    */
   synchronized void publishFlushedSegment(SegmentInfo newSegment,
-      FrozenBufferedDeletes packet) throws IOException {
+      FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
     // lock order IW -> BDS
     synchronized (bufferedDeletesStream) {
+      if (globalPacket != null && globalPacket.any()) {
+        bufferedDeletesStream.push(globalPacket);
+      } 
       // publishing the segment must be synched on IW -> BDS to make the sure
       // that no merge prunes away the seg. private delete packet
       final long nextGen;
@@ -2544,7 +2572,7 @@
       message("commit: done");
     }
   }
-
+  private final Object fullFlushLock = new Object();
   /**
    * Flush all in-memory buffered updates (adds and deletes)
    * to the Directory.
@@ -2576,7 +2604,6 @@
     }
 
     doBeforeFlush();
-
     assert testPoint("startDoFlush");
     boolean success = false;
     try {
@@ -2585,43 +2612,26 @@
         message("  start flush: applyAllDeletes=" + applyAllDeletes);
         message("  index before flush " + segString());
       }
-
-      boolean maybeMerge = docWriter.flushAllThreads(applyAllDeletes);
-
-      synchronized(this) {
-        if (!applyAllDeletes) {
-          // If deletes alone are consuming > 1/2 our RAM
-          // buffer, force them all to apply now. This is to
-          // prevent too-frequent flushing of a long tail of
-          // tiny segments:
-          if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-               bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
-            applyAllDeletes = true;
-            if (infoStream != null) {
-              message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
-            }
-          }
-        }
-
-        if (applyAllDeletes) {
-          if (infoStream != null) {
-            message("apply all deletes during flush");
-          }
-          applyAllDeletes();
-        } else if (infoStream != null) {
-          message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+      final boolean maybeMerge;
+      
+      synchronized (fullFlushLock) {
+        try {
+          maybeMerge = docWriter.flushAllThreads(applyAllDeletes);
+          success = true;
+        } finally {
+          docWriter.finishFullFlush(success);
         }
-
+      }
+      success = false;
+      synchronized(this) {
+        maybeApplyDeletes(applyAllDeletes);
         doAfterFlush();
         if (!maybeMerge) {
           // flushCount is incremented in flushAllThreads
           flushCount.incrementAndGet();
         }
-
         success = true;
-
         return maybeMerge;
-
       }
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "doFlush");
@@ -2633,6 +2643,32 @@
     }
   }
   
+  final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
+    if (!applyAllDeletes) {
+      // If deletes alone are consuming > 1/2 our RAM
+      // buffer, force them all to apply now. This is to
+      // prevent too-frequent flushing of a long tail of
+      // tiny segments:
+      if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+           bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
+        applyAllDeletes = true;
+        if (infoStream != null) {
+          message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
+        }
+      }
+    }
+
+    if (applyAllDeletes) {
+      if (infoStream != null) {
+        message("apply all deletes during flush");
+      }
+      applyAllDeletes();
+    } else if (infoStream != null) {
+      message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+    }
+
+  }
+  
   final synchronized void applyAllDeletes() throws IOException {
       flushDeletesCount.incrementAndGet();
       final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
Index: lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java	(revision 1091748)
+++ lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java	(working copy)
@@ -81,9 +81,7 @@
     final LineFileDocs docs = new LineFileDocs(random);
     for (int r = 0; r < 3; r++) {
       final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(
-          TEST_VERSION_CURRENT, new MockAnalyzer(random)).setMergePolicy(
-          newLogMergePolicy()).setMaxBufferedDocs(2));
-
+          TEST_VERSION_CURRENT, new MockAnalyzer(random)).setMaxBufferedDocs(2));
       final int SIZE = 200 * RANDOM_MULTIPLIER;
       final int numUpdates = (int) (SIZE * (2 + random.nextDouble()));
       int numThreads = 3 + random.nextInt(Runtime.getRuntime().availableProcessors());
@@ -120,22 +118,23 @@
 
     public void run() {
       try {
-//        IndexReader open = IndexReader.open(writer, true);
+        IndexReader open = null;
         for (int i = 0; i < num; i++) {
           Document doc = new Document();// docs.nextDoc();
           doc.add(newField("id", "test", Index.NOT_ANALYZED));
           writer.updateDocument(new Term("id", "test"), doc);
-//          if (random.nextInt(10) == 0) {
-//            IndexReader reader = open.reopen();
-//            if (reader != open) {
-//              open.close();
-//              open = reader;
-//            }
-//            assertEquals("iter: " + i + " numDocs: "+ open.numDocs() + " del: " + open.numDeletedDocs() + " max: " + open.maxDoc(), 1, open.numDocs());
-//            
-//          }
+          if (random.nextInt(10) == 0) {
+            if (open == null)
+              open = IndexReader.open(writer, true);
+            IndexReader reader = open.reopen();
+            if (reader != open) {
+              open.close();
+              open = reader;
+            }
+            assertEquals("iter: " + i + " numDocs: "+ open.numDocs() + " del: " + open.numDeletedDocs() + " max: " + open.maxDoc(), 1, open.numDocs());
+          }
         }
-//        open.close();
+        open.close();
       } catch (Exception e) {
         fail(e.getMessage());
       }
