Index: src/test/org/apache/lucene/index/TestNRT.java
===================================================================
--- src/test/org/apache/lucene/index/TestNRT.java	(revision 0)
+++ src/test/org/apache/lucene/index/TestNRT.java	(revision 0)
@@ -0,0 +1,231 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestNRT extends LuceneTestCase {
+  Random random = new Random();
+  
+  public void testSimple() throws Exception {
+    Directory primaryDir = new MockRAMDirectory();
+    RAMDirectory ramDir = new MockRAMDirectory();
+    
+    IndexWriter ramWriter = new IndexWriter(ramDir, new WhitespaceAnalyzer(), true,
+        IndexWriter.MaxFieldLength.LIMITED);
+    ramWriter.name = "ram";
+    IndexWriter primaryWriter = new IndexWriter(primaryDir, new WhitespaceAnalyzer(), true,
+        IndexWriter.MaxFieldLength.LIMITED);
+    primaryWriter.name = "primary";
+    //primaryWriter.setInfoStream(System.out);
+    NRT nrt = new NRT(ramWriter, primaryWriter, 1024*600);
+    
+    for (int i = 0; i < 100; i++) {
+      primaryWriter.addDocument(TestIndexWriterReader.createDocument(i,
+          "primary", 4));
+    }
+    for (int i = 0; i < 100; i++) {
+      nrt.addDocument(TestIndexWriterReader.createDocument(i, "ram", 4));
+    }
+
+    IndexReader reader = nrt.getReader();
+    assertEquals(200, reader.maxDoc());
+    IndexReader ramReader = ramWriter.getReader();
+    assertEquals(100, ramReader.maxDoc());
+    IndexReader primaryReader = primaryWriter.getReader();
+    assertEquals(100, primaryReader.maxDoc());
+    
+    ramReader.close();
+    primaryReader.close();
+    reader.close();
+
+    nrt.flush();
+    primaryReader = primaryWriter.getReader();
+    assertEquals(200, primaryReader.maxDoc());
+    ramReader = ramWriter.getReader();
+    assertEquals(0, ramReader.maxDoc());
+
+    ramReader.close();
+    primaryReader.close();
+    
+    primaryWriter.close();
+    ramWriter.close();
+    
+    ramDir.close();
+    primaryDir.close();
+  }
+  
+  /**
+   * This tests synchronization of the primary and ram writers.
+   * 
+   * @throws Exception
+   */
+  public void testRandomThreads() throws Exception {
+    long duration = 1000 * 15;
+    AtomicInteger totalAdded = new AtomicInteger(0);
+
+    Directory primaryDir = new MockRAMDirectory();
+    RAMDirectory ramDir = new MockRAMDirectory();
+    ramDir.listAll();
+    
+    IndexWriter ramWriter = new IndexWriter(ramDir, new WhitespaceAnalyzer(), true,
+        IndexWriter.MaxFieldLength.LIMITED);
+    IndexWriter primaryWriter = new IndexWriter(primaryDir, new WhitespaceAnalyzer(), true,
+        IndexWriter.MaxFieldLength.LIMITED);
+    //primaryWriter.setInfoStream(System.out);
+    NRT nrt = new NRT(ramWriter, primaryWriter, 1024*600);
+
+    GetReadersThread[] grts = new GetReadersThread[3];
+    for (int x = 0; x < grts.length; x++) {
+      grts[x] = new GetReadersThread("getreadersthread" + x, nrt);
+    }
+    AddDocsThread[] adts = new AddDocsThread[3];
+    for (int x = 0; x < adts.length; x++) {
+      adts[x] = new AddDocsThread("adddocsthread " + x, nrt, totalAdded);
+    }
+    for (int x = 0; x < adts.length; x++) {
+      adts[x].start();
+    }
+    for (int x = 0; x < grts.length; x++) {
+      grts[x].start();
+    }
+    long startTime = System.currentTimeMillis();
+    while (true) {
+      if ((System.currentTimeMillis() - startTime) > duration) {
+        break;
+      }
+    }
+    System.out.println(duration+" elapsed");
+    for (int x = 0; x < adts.length; x++) {
+      adts[x].dorun = false;
+    }
+    for (int x = 0; x < grts.length; x++) {
+      grts[x].dorun = false;
+    }
+    for (int x = 0; x < adts.length; x++) {
+      adts[x].waitForThread();
+      //System.out.println("add docs errors "+x);
+      //for (Throwable th : adts[x].errors) { 
+      //  th.printStackTrace();
+      //}
+    }
+    for (int x = 0; x < grts.length; x++) {
+      grts[x].waitForThread();
+    }
+    System.out.println("Completed...");
+    IndexReader reader = nrt.getReader();
+    assertEquals(totalAdded.get(), reader.maxDoc());
+    reader.close();
+    
+    ramWriter.close();
+    primaryWriter.close();
+    
+    ramDir.close();
+    primaryDir.close();
+    
+    System.out.println("Closed...");
+  }
+
+  public class GenericThread extends Thread {
+    boolean dorun = true;
+
+    List<Throwable> errors = new ArrayList<Throwable>();
+
+    NRT ramnrt;
+
+    public GenericThread(String threadName, NRT ramnrt) {
+      this.ramnrt = ramnrt;
+      setName(threadName);
+    }
+    
+    public void waitForThread() throws InterruptedException {
+      System.out.println("waiting for "+getName());
+      join();
+    }
+  }
+
+  /**
+   * Check to insure the readers are not identical
+   */
+  public class GetReadersThread extends GenericThread {
+    List<String> failures = new ArrayList<String>();
+
+    public GetReadersThread(String threadName, NRT ramnrt) {
+      super(threadName, ramnrt);
+    }
+
+    public void run() {
+      try {
+        while (dorun) {
+          int num = random.nextInt(100);
+          for (int x=0; x < num; x++) {
+            if (!dorun) return;
+            IndexReader reader = ramnrt.getReader();
+            Thread.sleep(random.nextInt(1000));
+            reader.close();
+          }
+        }
+      } catch (Throwable ex) {
+        ex.printStackTrace();
+        errors.add(ex);
+        if (errors.size() > 5) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
+  public class AddDocsThread extends GenericThread {
+    AtomicInteger totalAdded;
+
+    public AddDocsThread(String threadName, NRT ramnrt,
+        AtomicInteger totalAdded) {
+      super(threadName, ramnrt);
+      this.totalAdded = totalAdded;
+    }
+
+    public void run() {
+      try {
+        while (dorun) {
+          int numdocs = random.nextInt(1000);
+          for (int x = 0; x < numdocs; x++) {
+            if (!dorun)
+              return;
+            ramnrt.addDocument(TestIndexWriterReader.createDocument(x, "", 3));
+            totalAdded.incrementAndGet();
+          }
+          System.out.println("added " + numdocs + " docs");
+          ramnrt.flush();
+          System.out.println("added " + numdocs + " docs after flush");
+        }
+      } catch (Throwable th) {
+        th.printStackTrace();
+        errors.add(th);
+      }
+    }
+  }
+}
Index: src/java/org/apache/lucene/index/MergePolicy.java
===================================================================
--- src/java/org/apache/lucene/index/MergePolicy.java	(revision 832846)
+++ src/java/org/apache/lucene/index/MergePolicy.java	(working copy)
@@ -85,10 +85,12 @@
     final boolean useCompoundFile;
     boolean aborted;
     Throwable error;
+    IndexWriter writer;
 
-    public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
+    public OneMerge(IndexWriter writer, SegmentInfos segments, boolean useCompoundFile) {
       if (0 == segments.size())
         throw new RuntimeException("segments must include at least one segment");
+      this.writer = writer;
       this.segments = segments;
       this.useCompoundFile = useCompoundFile;
     }
Index: src/java/org/apache/lucene/index/LogMergePolicy.java
===================================================================
--- src/java/org/apache/lucene/index/LogMergePolicy.java	(revision 832846)
+++ src/java/org/apache/lucene/index/LogMergePolicy.java	(working copy)
@@ -242,7 +242,7 @@
         // First, enroll all "full" merges (size
         // mergeFactor) to potentially be run concurrently:
         while (last - maxNumSegments + 1 >= mergeFactor) {
-          spec.add(new OneMerge(infos.range(last-mergeFactor, last), useCompoundFile));
+          spec.add(new OneMerge(writer, infos.range(last-mergeFactor, last), useCompoundFile));
           last -= mergeFactor;
         }
 
@@ -254,7 +254,7 @@
             // Since we must optimize down to 1 segment, the
             // choice is simple:
             if (last > 1 || !isOptimized(infos.info(0)))
-              spec.add(new OneMerge(infos.range(0, last), useCompoundFile));
+              spec.add(new OneMerge(writer, infos.range(0, last), useCompoundFile));
           } else if (last > maxNumSegments) {
 
             // Take care to pick a partial merge that is
@@ -282,7 +282,7 @@
               }
             }
 
-            spec.add(new OneMerge(infos.range(bestStart, bestStart+finalMergeSize), useCompoundFile));
+            spec.add(new OneMerge(writer, infos.range(bestStart, bestStart+finalMergeSize), useCompoundFile));
           }
         }
         
@@ -322,7 +322,7 @@
           // deletions, so force a merge now:
           if (verbose())
             message("  add merge " + firstSegmentWithDeletions + " to " + (i-1) + " inclusive");
-          spec.add(new OneMerge(segmentInfos.range(firstSegmentWithDeletions, i), useCompoundFile));
+          spec.add(new OneMerge(writer, segmentInfos.range(firstSegmentWithDeletions, i), useCompoundFile));
           firstSegmentWithDeletions = i;
         }
       } else if (firstSegmentWithDeletions != -1) {
@@ -331,7 +331,7 @@
         // mergeFactor segments
         if (verbose())
           message("  add merge " + firstSegmentWithDeletions + " to " + (i-1) + " inclusive");
-        spec.add(new OneMerge(segmentInfos.range(firstSegmentWithDeletions, i), useCompoundFile));
+        spec.add(new OneMerge(writer, segmentInfos.range(firstSegmentWithDeletions, i), useCompoundFile));
         firstSegmentWithDeletions = -1;
       }
     }
@@ -339,7 +339,7 @@
     if (firstSegmentWithDeletions != -1) {
       if (verbose())
         message("  add merge " + firstSegmentWithDeletions + " to " + (numSegments-1) + " inclusive");
-      spec.add(new OneMerge(segmentInfos.range(firstSegmentWithDeletions, numSegments), useCompoundFile));
+      spec.add(new OneMerge(writer, segmentInfos.range(firstSegmentWithDeletions, numSegments), useCompoundFile));
     }
 
     return spec;
@@ -439,7 +439,7 @@
             spec = new MergeSpecification();
           if (verbose())
             message("    " + start + " to " + end + ": add this merge");
-          spec.add(new OneMerge(infos.range(start, end), useCompoundFile));
+          spec.add(new OneMerge(writer, infos.range(start, end), useCompoundFile));
         } else if (verbose())
           message("    " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs; skipping");
 
Index: src/java/org/apache/lucene/index/NRT.java
===================================================================
--- src/java/org/apache/lucene/index/NRT.java	(revision 0)
+++ src/java/org/apache/lucene/index/NRT.java	(revision 0)
@@ -0,0 +1,156 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.store.RAMDirectory;
+
+public class NRT {
+  private IndexWriter ramWriter;
+
+  private IndexWriter primaryWriter;
+
+  RAMDirectory ramDirectory;
+
+  long maxRam;
+
+  public NRT(IndexWriter ramWriter, IndexWriter primaryWriter, long maxRam) {
+    this.ramWriter = ramWriter;
+    ramDirectory = (RAMDirectory) ramWriter.getDirectory();
+    this.primaryWriter = primaryWriter;
+    this.maxRam = maxRam;
+  }
+
+  /**
+   * Flush the RAM segments and buffer to the primary writer.
+   * 
+   * @throws IOException
+   */
+  public synchronized void flush() throws IOException {
+    ramWriter.flush(false, true, true);
+    SegmentInfos ramInfos = null;
+    synchronized (ramWriter) {
+      ramInfos = ramWriter.getSegmentInfosCopy();
+      // adding ram infos to mergingSegments
+      // keeps them from being merged in ram writer
+      for (SegmentInfo ramInfo : ramInfos) {
+        ramWriter.mergingSegments.add(ramInfo);
+      }
+    }
+    if (ramInfos.size() > 0) {
+      MergePolicy.OneMerge merge = new MergePolicy.OneMerge(ramWriter,
+          ramInfos, primaryWriter.getUseCompoundFile());
+      primaryWriter.mergeIn(merge);
+      // remove the merged infos from the ram writer
+      synchronized (ramWriter) {
+        for (SegmentInfo info : ramInfos) {
+          ramWriter.segmentInfos.remove(info);
+          ramWriter.mergeFinish(merge);
+        }
+        ramWriter.deleter.checkpoint(ramWriter.segmentInfos, false);
+      }
+    }
+  }
+
+  public IndexReader getReader() throws IOException {
+    IndexReader[] ramReaders = ramWriter.getReader().getSequentialSubReaders();
+    IndexReader[] primaryReaders = primaryWriter.getReader()
+        .getSequentialSubReaders();
+    HashSet<IndexReader> readers = new HashSet<IndexReader>(ramReaders.length
+        + primaryReaders.length);
+    int i = 0;
+    Set<SegmentInfo> infos = new HashSet<SegmentInfo>();
+    for (int x = 0; x < ramReaders.length; x++) {
+      SegmentInfo info = ((SegmentReader) ramReaders[x]).getSegmentInfo();
+      if (!infos.contains(info)) {
+        infos.add(info);
+        readers.add(ramReaders[x]);
+      }
+    }
+    for (int x = 0; x < primaryReaders.length; x++) {
+      SegmentInfo info = ((SegmentReader) primaryReaders[x]).getSegmentInfo();
+      infos.add(info);
+      readers.add(primaryReaders[x]);
+    }
+    assert infos.size() == ramReaders.length + primaryReaders.length;
+    MultiReader reader = new MultiReader((IndexReader[]) readers
+        .toArray(new IndexReader[0]));
+    return reader;
+  }
+
+  public long getRAMSize() {
+    return ramWriter.ramSizeInBytes() + ramDirectory.sizeInBytes();
+  }
+
+  public void addDocument(Document doc) throws IOException,
+      InterruptedException {
+    addDocument(doc, primaryWriter.getAnalyzer());
+  }
+
+  public void addDocument(Document doc, Analyzer analyzer) throws IOException,
+      InterruptedException {
+    ramWriter.addDocument(doc, analyzer);
+    ifFlush();
+  }
+
+  public void deleteDocuments(Query query) throws CorruptIndexException,
+      IOException, InterruptedException {
+    ramWriter.deleteDocuments(query);
+    primaryWriter.deleteDocuments(query);
+    ifFlush();
+  }
+
+  public void deleteDocuments(Query... queries) throws CorruptIndexException,
+      IOException, InterruptedException {
+    ramWriter.deleteDocuments(queries);
+    primaryWriter.deleteDocuments(queries);
+    ifFlush();
+  }
+
+  public void updateDocument(Term term, Document doc, Analyzer analyzer)
+      throws CorruptIndexException, IOException, InterruptedException {
+    ramWriter.updateDocument(term, doc, analyzer);
+    primaryWriter.deleteDocuments(term);
+    ifFlush();
+  }
+
+  public void updateDocument(Term term, Document doc)
+      throws CorruptIndexException, IOException, InterruptedException {
+    ramWriter.updateDocument(term, doc);
+    primaryWriter.deleteDocuments(term);
+    ifFlush();
+  }
+
+  private void ifFlush() throws IOException, InterruptedException {
+    if (getRAMSize() > maxRam) {
+      long ramSizeBytes = ramWriter.ramSizeInBytes();
+      long ramDirBytes = ramDirectory.sizeInBytes();
+      // if (infoStream != null)
+      // infoStream.println("ramSize:"+ramSizeBytes+" ramDir:"+ramDirBytes);
+      flush();
+    }
+  }
+}
Index: src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- src/java/org/apache/lucene/index/IndexWriter.java	(revision 832846)
+++ src/java/org/apache/lucene/index/IndexWriter.java	(working copy)
@@ -253,10 +253,10 @@
   private SegmentInfos localRollbackSegmentInfos;      // segmentInfos we will fallback to if the commit fails
   private int localFlushedDocCount;               // saved docWriter.getFlushedDocCount during local transaction
 
-  private SegmentInfos segmentInfos = new SegmentInfos();       // the segments
+  SegmentInfos segmentInfos = new SegmentInfos();       // the segments
 
   private DocumentsWriter docWriter;
-  private IndexFileDeleter deleter;
+  IndexFileDeleter deleter;
 
   private Set<SegmentInfo> segmentsToOptimize = new HashSet<SegmentInfo>();           // used by optimize to note those needing optimization
 
@@ -266,10 +266,11 @@
 
   private boolean closed;
   private boolean closing;
+  String name;
 
   // Holds all SegmentInfo instances currently involved in
   // merges
-  private HashSet<SegmentInfo> mergingSegments = new HashSet<SegmentInfo>();
+  HashSet<SegmentInfo> mergingSegments = new HashSet<SegmentInfo>();
 
   private MergePolicy mergePolicy = new LogByteSizeMergePolicy(this);
   private MergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
@@ -520,6 +521,9 @@
         // near real-time reader is kept open after the
         // IndexWriter instance is closed
         sr.decRef();
+        if (sr.getRefCount() > 0) {
+          System.out.println(name+" "+sr.getSegmentInfo()+" close ref count:"+sr.getRefCount());
+        }
       }
     }
     
@@ -2877,7 +2881,7 @@
     }
 
     // sanity check
-    assert 0 == mergingSegments.size();
+    //assert 0 == mergingSegments.size();
   }
 
   /*
@@ -3060,7 +3064,28 @@
   private boolean hasExternalSegments() {
     return segmentInfos.hasExternalSegments(directory);
   }
-
+  
+  final void mergeIn(MergePolicy.OneMerge merge) throws CorruptIndexException, IOException {
+    synchronized (this) {
+      for (SegmentInfo info : merge.segments) {
+        segmentInfos.add(info);
+      }
+      if (registerMerge(merge)) {
+        pendingMerges.remove(merge);
+        runningMerges.add(merge);
+      }
+    }
+    merge(merge);
+  }
+  
+  synchronized SegmentInfos getSegmentInfosCopy() {
+    SegmentInfos copy = new SegmentInfos();
+    for (SegmentInfo info : segmentInfos) {
+      copy.add(info);
+    }
+    return copy;
+  }
+  
   /* If any of our segments are using a directory != ours
    * then we have to either copy them over one by one, merge
    * them (if merge policy has chosen to) or wait until
@@ -3089,7 +3114,7 @@
           info = segmentInfos.info(i);
           if (info.dir != directory) {
             done = false;
-            final MergePolicy.OneMerge newMerge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i), mergePolicy instanceof LogMergePolicy && getUseCompoundFile());
+            final MergePolicy.OneMerge newMerge = new MergePolicy.OneMerge(this, segmentInfos.range(i, 1+i), mergePolicy instanceof LogMergePolicy && getUseCompoundFile());
 
             // Returns true if no running merge conflicts
             // with this one (and, records this merge as
@@ -3827,7 +3852,7 @@
     // If the merged segments had pending changes, clear
     // them so that they don't bother writing them to
     // disk, updating SegmentInfo, etc.:
-    readerPool.clear(merge.segments);
+    merge.writer.readerPool.clear(merge.segments);
 
     if (merge.optimize)
       segmentsToOptimize.add(merge.info);
@@ -4211,7 +4236,7 @@
 
         // Hold onto the "live" reader; we will use this to
         // commit merged deletes
-        SegmentReader reader = merge.readers[i] = readerPool.get(info, merge.mergeDocStores,
+        SegmentReader reader = merge.readers[i] = merge.writer.readerPool.get(info, merge.mergeDocStores,
                                                                  MERGE_READ_BUFFER_SIZE,
                                                                  -1);
 
@@ -4292,7 +4317,7 @@
           for (int i=0;i<numSegments;i++) {
             if (merge.readers[i] != null) {
               try {
-                readerPool.release(merge.readers[i], true);
+                merge.writer.readerPool.release(merge.readers[i], true);
               } catch (Throwable t) {
               }
             }
@@ -4309,7 +4334,7 @@
         } else {
           for (int i=0;i<numSegments;i++) {
             if (merge.readers[i] != null) {
-              readerPool.release(merge.readers[i], true);
+              merge.writer.readerPool.release(merge.readers[i], true);
             }
 
             if (merge.readersClone[i] != null) {
@@ -4516,7 +4541,7 @@
       return true;
     }
   }
-
+  
   private synchronized void doWait() {
     // NOTE: the callers of this method should in theory
     // be able to do simply wait(), but, as a defense
