Index: lucene/src/test/org/apache/lucene/index/TestDWPTFlushByRAM.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestDWPTFlushByRAM.java	(revision 0)
+++ lucene/src/test/org/apache/lucene/index/TestDWPTFlushByRAM.java	(revision 0)
@@ -0,0 +1,90 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.util.LuceneTestCase;
+
+// TODO: implement a simulated disk-like delay for the flushes
+public class TestDWPTFlushByRAM extends LuceneTestCase {
+  public TestDWPTFlushByRAM(String name) {
+    super(name);
+  }
+
+  public void test() throws Exception {
+    int numThreads = 2;
+    int numDocs = 250;
+    MockRAMDirectory dir = new MockRAMDirectory();
+    IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT,
+        new MockAnalyzer());
+    DocumentsWriterThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+        numThreads);
+    iwc.setIndexerThreadPool(threadPool);
+    iwc.setRAMBufferSizeMB(2.0);
+    // this is added as otherwise errors are being reported
+    iwc.setMaxBufferedDocs(Integer.MAX_VALUE);
+    IndexWriter writer = new IndexWriter(dir, iwc);
+    // compound files consumes extra RAM and we could more easily run
+    // out of heap
+    ((LogMergePolicy)iwc.getMergePolicy()).setUseCompoundFile(false);
+    
+    AddDocsThread[] threads = new AddDocsThread[numThreads];
+    for (int x=0; x < threads.length; x++) {
+      threads[x] = new AddDocsThread(numDocs / numThreads, numThreads, writer);
+      threads[x].start();
+    }
+    for (int x=0; x < threads.length; x++) {
+      threads[x].join();
+    }
+    writer.close();
+    dir.close();
+  }
+
+  public class AddDocsThread extends Thread {
+    int max;
+    int numThreads;
+    IndexWriter writer;
+    IndexWriterConfig iwc;
+
+    public AddDocsThread(int max, int numThreads, IndexWriter writer) {
+      this.max = max;
+      this.numThreads = numThreads;
+      this.writer = writer;
+      iwc = writer.getConfig();
+    }
+
+    public void run() {
+      try {
+        for (int x = 0; x < max; x++) {
+          Document doc = new Document();
+          DocHelper.setupDoc(doc);
+          writer.addDocument(doc);
+          //System.out.println("addDoc: "+x);
+          if (x % 5 == 0) {
+            long ramSize = writer.ramSizeInBytes();
+            System.out.println(Thread.currentThread().getName()+" ramsize "+x+": "+ramSize);
+          }
+        }
+      } catch (Exception ex) {
+        ex.printStackTrace();
+      }
+    }
+  }
+}
Index: lucene/src/test/org/apache/lucene/index/TestSegmentFlushPolicy.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestSegmentFlushPolicy.java	(revision 0)
+++ lucene/src/test/org/apache/lucene/index/TestSegmentFlushPolicy.java	(revision 0)
@@ -0,0 +1,39 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestSegmentFlushPolicy extends LuceneTestCase {
+  
+  public void testTiers5() throws Exception {
+    SegmentFlushPolicy p = new SegmentFlushPolicy(10.0);
+    long[] tiers = p.getTiers(5);
+    assertEquals(tiers[0], 9437184);
+    assertEquals(tiers[2], 10485760);
+    //System.out.println("tiers 5 threads 10.0 GB: "+Arrays.toString(tiers));
+  }
+  
+  public void testTiers2() throws Exception {
+    SegmentFlushPolicy p = new SegmentFlushPolicy(10.0);
+    long[] tiers = p.getTiers(2);
+    assertEquals(tiers[0], 9437184);
+    assertEquals(tiers[1], 11534336);
+    //System.out.println("tiers 2 threads 10.0 GB: "+Arrays.toString(tiers));
+  }
+}
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java	(revision 993075)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java	(working copy)
@@ -73,6 +73,10 @@
     this.maxNumThreadStates = (maxNumThreadStates < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
   }
   
+  public int getThreadCount() {
+    return allThreadStates.length;
+  }
+  
   public final int getMaxThreadStates() {
     return this.maxNumThreadStates;
   }
Index: lucene/src/java/org/apache/lucene/index/SegmentFlushPolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SegmentFlushPolicy.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/SegmentFlushPolicy.java	(revision 0)
@@ -0,0 +1,63 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Flush DocumentsWriterPerThread in tiers.  When the total
+ * RAM used reaches the min tier, flush it.  When the total reaches
+ * the next tier, flush the next highest DWPT, and so on until they're
+ * all in the process of being flushed.  Eventually the RAM usage 
+ * will lower to a reasonable level.  
+ */
+public class SegmentFlushPolicy {
+  public static double HIGH_WATER_PERCENT = 1.10;
+  public static double LOW_WATER_PERCENT = 0.90;
+  long ramMax;
+  
+  public SegmentFlushPolicy(double ramBufferSize) {
+    ramMax = (long)(ramBufferSize * (double)1024 * (double)1024);
+  }
+  
+  /**
+   * Return the number of bytes for each tier to flush at.
+   */
+  public long[] getTiers(int numThreads) {
+    assert numThreads > 0;
+    long lowWater = getWaterMarkBytes(LOW_WATER_PERCENT);
+    long highWater = getWaterMarkBytes(HIGH_WATER_PERCENT);
+    //System.out.println("lowWater: "+lowWater);
+    //System.out.println("highWater: "+highWater);
+    long[] tiers = new long[numThreads];
+    tiers[0] = lowWater;
+    long inc = (highWater - lowWater) / (numThreads-1);
+    long cur = lowWater;
+    for (int x=1; x < tiers.length; x++) {
+      cur += inc;
+      tiers[x] = cur;
+    }
+    return tiers;
+  }
+  
+  /**
+   * Returns the water mark for the given percentage.
+   */
+  public long getWaterMarkBytes(double percent) {
+    return (long)((double)ramMax * percent);
+  }
+}
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(revision 993075)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(working copy)
@@ -19,14 +19,13 @@
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.codecs.Codec;
 import org.apache.lucene.search.Similarity;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FilterDirectory;
-import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 
 public class DocumentsWriterPerThread {
@@ -142,7 +141,7 @@
   SegmentWriteState flushState;
 
   long[] sequenceIDs = new long[8];
-  
+  AtomicBoolean willFlush = new AtomicBoolean(false);
   long numBytesUsed;
   
   public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
@@ -165,6 +164,18 @@
     aborting = true;
   }
   
+  public void setWillFlush(boolean will) {
+    willFlush.set(will);
+  }
+  
+  public boolean willFlush() {
+    return willFlush.get();
+  }
+  
+  public boolean isFlushing() {
+    return flushState != null;
+  }
+  
   public void addDocument(Document doc, Analyzer analyzer) throws IOException {
     docState.doc = doc;
     docState.analyzer = analyzer;
@@ -234,9 +245,12 @@
   }
   
   /** Reset after a flush */
-  private void doAfterFlush() throws IOException {
+  void doAfterFlush() throws IOException {
     segment = null;
     numDocsInRAM = 0;
+    willFlush.set(false);
+    ramAllocator.reset();
+    flushState = null;
   }
     
   /** Flush all pending docs to a new segment */
@@ -274,7 +288,6 @@
       flushedDocCount += flushState.numDocs;
 
       long maxSequenceID = sequenceIDs[numDocsInRAM-1];
-      doAfterFlush();
       
       // Create new SegmentInfo, but do not add to our
       // segmentInfos until deletes are flushed
@@ -284,7 +297,6 @@
                                    directory, false,
                                    hasProx(),
                                    getCodec());
-
       
       newSegment.setMinSequenceID(sequenceIDs[0]);
       newSegment.setMaxSequenceID(maxSequenceID);
@@ -305,6 +317,10 @@
     return segment;
   }
   
+  long getRAMUsed() {
+    return ramAllocator.getRAMUsed();
+  }
+  
   void bytesUsed(long numBytes) {
     ramAllocator.bytesUsed(numBytes);
   }
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java	(revision 993075)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java	(working copy)
@@ -8,8 +8,6 @@
 
 class DocumentsWriterRAMAllocator {
   final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
-  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
-
   
   class ByteBlockAllocator extends ByteBlockPool.Allocator {
     final int blockSize;
@@ -27,15 +25,15 @@
       final byte[] b;
       if (0 == size) {
         b = new byte[blockSize];
-        // Always record a block allocated, even if
-        // trackAllocations is false.  This is necessary
-        // because this block will be shared between
-        // things that don't track allocations (term
-        // vectors) and things that do (freq/prox
-        // postings).
-        numBytesUsed += blockSize;
       } else
         b = freeByteBlocks.remove(size-1);
+      // Always record a block allocated, even if
+      // trackAllocations is false.  This is necessary
+      // because this block will be shared between
+      // things that don't track allocations (term
+      // vectors) and things that do (freq/prox
+      // postings).
+      numBytesUsed += blockSize;
       return b;
     }
 
@@ -57,22 +55,28 @@
   }
 
   private ArrayList<int[]> freeIntBlocks = new ArrayList<int[]>();
-
+  
+  public void reset() {
+    numBytesUsed = 0;
+  }
+  
   /* Allocate another int[] from the shared pool */
   int[] getIntBlock() {
     final int size = freeIntBlocks.size();
     final int[] b;
     if (0 == size) {
       b = new int[INT_BLOCK_SIZE];
-      // Always record a block allocated, even if
-      // trackAllocations is false.  This is necessary
-      // because this block will be shared between
-      // things that don't track allocations (term
-      // vectors) and things that do (freq/prox
-      // postings).
-      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
-    } else
+    } else 
       b = freeIntBlocks.remove(size-1);
+    // Always record a block allocated, even if
+    // trackAllocations is false.  This is necessary
+    // because this block will be shared between
+    // things that don't track allocations (term
+    // vectors) and things that do (freq/prox
+    // postings).
+    
+    // we track a block as consuming RAM if it's in use
+    numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
     return b;
   }
 
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(revision 993075)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(working copy)
@@ -2,10 +2,13 @@
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -69,7 +72,11 @@
   private final PrintStream infoStream;
 
   private Map<DocumentsWriterPerThread, Long> minSequenceIDsPerThread = new HashMap<DocumentsWriterPerThread, Long>();
-
+  
+  long[] flushTiers;
+  int tier = 0;
+  double workingRamBufferSize = Double.MAX_VALUE;
+  
   public DocumentsWriter(Directory directory, IndexWriter indexWriter, IndexWriterConfig config) {
     this.openFilesTrackingDirectory = new FilterDirectory(directory) {
       @Override public IndexOutput createOutput(final String name) throws IOException {
@@ -85,7 +92,7 @@
     this.threadPool = config.getIndexerThreadPool();
     this.infoStream = indexWriter.getInfoStream();
   }
-
+  
   public int getMaxBufferedDocs() {
     return maxBufferedDocs;
   }
@@ -211,12 +218,18 @@
       }
     }
 
-    long deltaRAM = perThread.numBytesUsed - perThreadRAMUsedBeforeAdd;
-    long oldValue = ramUsed.get();
-    while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) {
-      oldValue = ramUsed.get();
+    //long deltaRAM = perThread.numBytesUsed - perThreadRAMUsedBeforeAdd;
+    //long oldValue = ramUsed.get();
+    //while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) {
+    //  oldValue = ramUsed.get();
+    //}
+    
+    long cumulative = getCumulativeRAMUsed();
+    if (cumulative == 0) {
+      System.out.println("cumulative is zero dwpt:"+perThread.getRAMUsed()+" flushed:"+flushed);
     }
-
+    ramUsed.set(cumulative);
+    
     return flushed;
   }
 
@@ -367,7 +380,7 @@
         if (!success) {
           if (infoStream != null) {
             message("hit exception " +
-            		"reating compound file for newly flushed segment " + newSegment.name);
+            		"reading compound file for newly flushed segment " + newSegment.name);
           }
           indexWriter.getIndexFileDeleter().deleteFile(IndexFileNames.segmentFileName(newSegment.name, "", 
               IndexFileNames.COMPOUND_FILE_EXTENSION));
@@ -394,8 +407,21 @@
     }
     
     indexWriter.addNewSegment(newSegment);
+    
+    perThread.doAfterFlush();
   }
   
+  /**
+   * Return the sum of the RAM used by each DWPT
+   */
+  private long getCumulativeRAMUsed() {
+    long total = 0;
+    for (DocumentsWriterPerThread dwpt : getDocumentsWriterPerThreads()) {
+      total += dwpt.getRAMUsed();
+    }
+    return total;
+  }
+  
   // Returns true if an abort is in progress
   void pauseAllThreads() {
     threadPool.pauseAllThreads();
@@ -427,10 +453,77 @@
       
       return true;
     }
+    // flush by RAM
+    return maybeFlushByRAM(perThread);
+  }
 
+  /**
+   * Return true if the given DWPT is consuming the most RAM
+   */
+  private boolean perThreadHasMaxRAM(DocumentsWriterPerThread perThread) {
+    long ram = perThread.getRAMUsed();
+    for (DocumentsWriterPerThread dwpt : getDocumentsWriterPerThreads()) {
+      if (dwpt != perThread && ram < dwpt.getRAMUsed()) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  private final boolean maybeFlushByRAM(DocumentsWriterPerThread perThread) throws IOException {
+    boolean doFlush = false;
+    synchronized (this) {
+      // if there are no flush tiers 
+      // or the RAM buffer size has changed 
+      if (flushTiers == null 
+          || workingRamBufferSize != indexWriter.getConfig().getRAMBufferSizeMB()) {
+        int numThreads = threadPool.getThreadCount();
+        SegmentFlushPolicy p = new SegmentFlushPolicy(getRAMBufferSizeMB());
+        flushTiers = p.getTiers(numThreads);
+        workingRamBufferSize = getRAMBufferSizeMB();
+        tier = 0;
+      }
+      long ramUsed = getRAMUsed();
+      // if the total RAM usage exceeds the current tier's RAM maximum
+      if (tier < flushTiers.length 
+          && ramUsed > flushTiers[tier]) {
+        // Only flush this DWPT if it's not 
+        // already flushing and it's using the
+        // highest amount of RAM 
+        if (!perThread.isFlushing()
+            && perThreadHasMaxRAM(perThread)) {
+          tier++;
+          doFlush = true;
+          perThread.setWillFlush(true);
+        }
+      } else if (!dwptsWillFlush()) {
+        // if no DWPTs are flushing, reset the tier to 0
+        tier = 0;
+      }
+    }
+    // flush the segment outside of the synchronized block
+    if (doFlush) {
+      return flushSegment(perThread);
+    } else {
+      return false;
+    }
+  }
+  
+  private boolean dwptsWillFlush() {
+    DocumentsWriterPerThread[] dwpts = getDocumentsWriterPerThreads();
+    for (DocumentsWriterPerThread dwpt : dwpts) {
+      if (dwpt.willFlush()) return true;
+    }
     return false;
   }
-
+  
+  public DocumentsWriterPerThread[] getDocumentsWriterPerThreads() {
+    List<DocumentsWriterPerThread> list = new ArrayList<DocumentsWriterPerThread>(
+        minSequenceIDsPerThread.keySet());
+    return (DocumentsWriterPerThread[]) list
+        .toArray(new DocumentsWriterPerThread[0]);
+  }
+  
   private boolean flushSegment(DocumentsWriterPerThread perThread)
       throws IOException {
     if (perThread.getNumDocsInRAM() == 0) {
