Index: lucene/src/java/org/apache/lucene/index/ByRAMFlushPolicyBase.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/ByRAMFlushPolicyBase.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/ByRAMFlushPolicyBase.java	(revision 0)
@@ -0,0 +1,110 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * 
+ * 
+ */
+//nocommit - document this
+public abstract class ByRAMFlushPolicyBase extends FlushPolicy {
+  protected long maxRamBytes;
+  protected long maxFlushBytes;
+
+  public ByRAMFlushPolicyBase() {
+    super();
+  }
+
+  @Override
+  protected void init(DocumentsWriter docsWriter) {
+    super.init(docsWriter);
+    assert docsWriter.ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+    maxFlushBytes = maxRamBytes = docsWriter.ramBufferSize;
+    maxDocsPerWriter = maxDocsPerWriter == IndexWriterConfig.DISABLE_AUTO_FLUSH ? Integer.MAX_VALUE
+        : maxDocsPerWriter;
+  }
+
+  /**
+   * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
+   * pending
+   */
+  protected void markLargesWriterPending(DocWriterSession session,
+      ThreadState perThreadState, final long currentBytesPerThread) {
+    long maxRamSoFar = currentBytesPerThread;
+    // the dwpt which needs to be flushed eventually
+    ThreadState maxRamUsingThreadState = perThreadState;
+    assert !session.isFlushPending(perThreadState) : "DWPT should have flushed";
+    Iterator<ThreadState> activePerThreadsIterator = session.allActiveThreads();
+    while (activePerThreadsIterator.hasNext()) {
+      ThreadState next = activePerThreadsIterator.next();
+      if (!session.isFlushPending(next)) {
+        final long nextRam = session.getPerThreadBytes(next);
+        if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
+          maxRamSoFar = nextRam;
+          maxRamUsingThreadState = next;
+        }
+      }
+    }
+    assert maxRamUsingThreadState.perThread.getNumDocsInRAM() > 0;
+    assert message("set " + maxRamUsingThreadState.ord
+        + " pending on lower watermark");
+    session.setFlushPending(maxRamUsingThreadState);
+  }
+
+  /**
+   * Marks all active {@link DocumentsWriterPerThread} with at least one
+   * document as flush pending
+   */
+  protected void markAllWritersPending(DocWriterSession session,
+      ThreadState perThreadState) {
+    // upper watermark exceeded
+    // mark all threads flushing
+    Iterator<ThreadState> activePerThreadsIterator = session.allActiveThreads();
+    while (activePerThreadsIterator.hasNext()) {
+      /* flush all active threads */
+      final ThreadState next = activePerThreadsIterator.next();
+      if (!session.isFlushPending(next) && next.perThread.getNumDocsInRAM() > 0) {
+        session.setFlushPending(next);
+      }
+    }
+  }
+
+  /**
+   * @return the maxRamBytes
+   */
+  public long getMaxRamBytes() {
+    return maxRamBytes;
+  }
+
+  /**
+   * @return the maxFlushBytes
+   */
+  public long getMaxFlushBytes() {
+    return maxFlushBytes;
+  }
+
+  public abstract long getUpperWatermark();
+
+  public abstract long getWatermark();
+
+  public abstract long getMaxNetBytes();
+
+}
\ No newline at end of file

Property changes on: lucene/src/java/org/apache/lucene/index/ByRAMFlushPolicyBase.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/DocCountFlushPolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocCountFlushPolicy.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/DocCountFlushPolicy.java	(revision 0)
@@ -0,0 +1,44 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * A flush policy that flushes by doc count per {@link DocumentsWriterPerThread}
+ */
+public class DocCountFlushPolicy extends FlushPolicy {
+
+  @Override
+  public void visit(DocWriterSession session, ThreadState state) {
+    if (state.perThread.getNumDocsInRAM() >= maxDocsPerWriter) {
+      session.setFlushPending(state);
+    }
+  }
+
+  @Override
+  protected void init(DocumentsWriter docsWriter) {
+    super.init(docsWriter);
+    assert maxDocsPerWriter != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
+
+  @Override
+  public boolean isStalled(DocWriterSession session, boolean isHealty) {
+    return false;
+  }
+
+}

Property changes on: lucene/src/java/org/apache/lucene/index/DocCountFlushPolicy.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/DocWriterSession.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocWriterSession.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/DocWriterSession.java	(revision 0)
@@ -0,0 +1,210 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.index.FlushPolicy.FlushSpecification;
+
+public class DocWriterSession {
+  private long activeBytes = 0;
+  private long flushPendingBytes = 0;
+  private long flushBytes = 0;
+  int numPending = 0;
+  long peakActiveBytes = 0;// only with assert
+  long peakFlushPendingBytes = 0;// only with assert
+  long peakFlushBytes = 0;// only with assert
+  long peakDelta = 0;// only with assert
+  long peakNetBytes = 0;// only with assert
+  private final DocWriterSession.PerThreadFlushState[] states;
+  private final DocumentsWriterPerThreadPool threadPool;
+
+  public DocWriterSession(DocumentsWriterPerThreadPool threadPool) {
+    states = new DocWriterSession.PerThreadFlushState[threadPool
+        .getMaxThreadStates()];
+    for (int i = 0; i < states.length; i++) {
+      states[i] = new PerThreadFlushState();
+    }
+    this.threadPool = threadPool;
+  }
+
+  public long activeBytes() {
+    return activeBytes;
+  }
+
+  public long flushBytes() {
+    return flushBytes;
+  }
+
+  public long flushPendingBytes() {
+    return flushPendingBytes;
+  }
+
+  public long netBytes() {
+    return flushBytes + activeBytes + flushPendingBytes;
+  }
+
+  public long getPerThreadBytes(ThreadState perThread) {
+    return states[perThread.ord].perThreadBytes;
+  }
+
+  public Iterator<ThreadState> allActiveThreads() {
+    return threadPool.getActivePerThreadsIterator();
+  }
+
+  void commitPerThreadBytes(ThreadState perThread) {
+    DocWriterSession.PerThreadFlushState state = states[perThread.ord];
+    long delta = perThread.perThread.bytesUsed() - state.perThreadBytes;
+    states[perThread.ord].perThreadBytes += delta;
+    switch (state.state) {
+    case Active:
+      activeBytes += delta;
+      break;
+    case Pending:
+      flushPendingBytes += delta;
+      break;
+    default:
+      throw new IllegalStateException("unknown state: " + state.state);
+    }
+    assert updatePeaks(delta);
+  }
+
+  private boolean updatePeaks(long delta) {
+    peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
+    peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
+    peakFlushPendingBytes = Math.max(peakFlushPendingBytes, flushPendingBytes);
+    peakDelta = Math.max(peakDelta, delta);
+    peakNetBytes = Math.max(peakNetBytes, netBytes());
+    return true;
+  }
+
+  void doAfterFlush(FlushSpecification spec) {
+    flushBytes -= spec.ramBytesUsed;
+    threadPool.recycle(spec.writer);
+  }
+
+  public void setFlushPending(ThreadState... perThread) {
+    for (ThreadState threadState : perThread) {
+      final int ord = threadState.ord;
+      assert states[ord].state == FlushState.Active;
+      assert threadState.perThread.getNumDocsInRAM() > 0;
+      states[ord].state = FlushState.Pending;
+      final long bytes = states[ord].perThreadBytes;
+      flushPendingBytes += bytes;
+      activeBytes -= bytes;
+      numPending++;
+    }
+  }
+  
+  public void doOnAbort(ThreadState state) {
+    PerThreadFlushState flushState = states[state.ord];
+    switch (flushState.state) {
+    case Active:
+      activeBytes -= flushState.perThreadBytes;
+      break;
+    case Pending:
+      flushPendingBytes -= flushState.perThreadBytes;
+      break;
+    default:
+      throw new IllegalStateException("unknown state: " + flushState.state);
+    }
+    flushState.state = FlushState.Aborted;
+    flushState.reset();
+    threadPool.replaceForFlush(state);
+  }
+
+  public boolean hasPendingFlushes() {
+    return numPending > 0;
+  }
+
+  public boolean isFlushPending(ThreadState perThread) {
+    return states[perThread.ord].state == FlushState.Pending;
+  }
+
+  // assert only
+  private void printMemory(String info) {
+    long[] mem = new long[states.length];
+    for (int i = 0; i < states.length; i++) {
+      mem[i] = states[i].perThreadBytes;
+    }
+    System.out.println(info + " " + Arrays.toString(mem));
+  }
+
+  public FlushSpecification tryCheckoutForFlush(ThreadState perThread,
+      boolean doReplace) {
+    if (states[perThread.ord].state == FlushState.Pending) {
+      if (perThread.tryLock()) {
+        // printMemory("before");
+        assert perThread.isHeldByCurrentThread();
+        try {
+          final DocumentsWriterPerThread dwpt;
+          if (doReplace) {
+            dwpt = threadPool.replaceForFlush(perThread);
+          } else { // don't replace if we closed for instance...
+            dwpt = perThread.perThread;
+          }
+          final long bytes = states[perThread.ord].perThreadBytes;
+          flushPendingBytes -= bytes;
+          flushBytes += bytes;
+          states[perThread.ord].reset();
+          // printMemory("after");
+          numPending--;
+          return new FlushSpecification(dwpt, bytes);
+        } finally {
+          perThread.unlock();
+        }
+      }
+    }
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    return "DocWriterSession [activeBytes=" + activeBytes + ", flushBytes="
+        + flushBytes + ", pendingBytes=" + flushPendingBytes + ", states="
+        + Arrays.toString(states) + "]";
+  }
+
+  private static enum FlushState {
+    Active, Pending, Aborted
+  }
+
+  private static class PerThreadFlushState {
+    DocWriterSession.FlushState state = FlushState.Active;
+    long perThreadBytes;
+
+    void reset() {
+      assert state == FlushState.Pending || state == FlushState.Aborted;
+      state = FlushState.Active;
+      perThreadBytes = 0;
+    }
+
+    @Override
+    public String toString() {
+      return "PerThreadFlushState [perThreadBytes=" + perThreadBytes
+          + ", state=" + state + "]";
+    }
+  }
+
+}
\ No newline at end of file

Property changes on: lucene/src/java/org/apache/lucene/index/DocWriterSession.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriter.java	(working copy)
@@ -30,6 +30,7 @@
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.index.FlushPolicy.FlushSpecification;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -117,15 +118,14 @@
   final IndexWriter indexWriter;
 
   private AtomicInteger numDocsInRAM = new AtomicInteger(0);
-  private AtomicLong ramUsed = new AtomicLong(0);
 
   // How much RAM we can use before flushing.  This is 0 if
   // we are flushing by doc count instead.
-  private long ramBufferSize = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);
+  final long ramBufferSize;
 
   // Flush @ this number of docs.  If ramBufferSize is
   // non-zero we will flush by RAM usage instead.
-  private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
+  final int maxBufferedDocs;
 
   final BufferedDeletesStream bufferedDeletesStream;
   // TODO: cutover to BytesRefHash
@@ -133,15 +133,32 @@
   final IndexingChain chain;
 
   final DocumentsWriterPerThreadPool perThreadPool;
+  final FlushPolicy flushPolicy;
+  final DocWriterSession docWriterSession;
+  final Healthiness healtiness;
+  final boolean hijackThreadsForFlush;
 
   DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
     this.directory = directory;
     this.indexWriter = writer;
-    this.similarityProvider = writer.getConfig().getSimilarityProvider();
+    final IndexWriterConfig config = writer.getConfig();
+    this.similarityProvider = config.getSimilarityProvider();
     this.bufferedDeletesStream = bufferedDeletesStream;
     this.perThreadPool = indexerThreadPool;
     this.chain = chain;
     this.perThreadPool.initialize(this, fieldInfos);
+    ramBufferSize = getRAMBufferSizeMB(config);
+    maxBufferedDocs = config.getMaxBufferedDocs();
+    final FlushPolicy configuredPolicy = config.getFlushPolicy();
+    if (configuredPolicy == null) {
+      flushPolicy = ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH ? new DocCountFlushPolicy() : new SingleTierFlushPolicy();
+    } else {
+      flushPolicy = configuredPolicy;
+    }
+    flushPolicy.init(this);
+    hijackThreadsForFlush = flushPolicy.hijackThreadsForFlush();
+    docWriterSession = new DocWriterSession(indexerThreadPool);
+    healtiness = new Healthiness(flushPolicy);
   }
 
   boolean deleteQueries(final Query... queries) throws IOException {
@@ -151,7 +168,7 @@
       }
     }
 
-    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+    final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
     while (threadsIterator.hasNext()) {
       ThreadState state = threadsIterator.next();
@@ -237,32 +254,21 @@
       DocumentsWriterPerThread perThread = it.next().perThread;
       perThread.docState.infoStream = this.infoStream;
       perThread.docState.similarityProvider = this.similarityProvider;
+      flushPolicy.setInfoStream(this.infoStream);
     }
   }
+  
 
   /** Set how much RAM we can use before flushing. */
-  synchronized void setRAMBufferSizeMB(double mb) {
+  private long getRAMBufferSizeMB(IndexWriterConfig config) {
+    final double mb = config.getRAMBufferSizeMB();
     if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
+      return IndexWriterConfig.DISABLE_AUTO_FLUSH;
     } else {
-      ramBufferSize = (long) (mb*1024*1024);
+      return (long) (mb*1024.0d*1024.0d);
     }
   }
 
-  synchronized double getRAMBufferSizeMB() {
-    if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-      return ramBufferSize;
-    } else {
-      return ramBufferSize/1024./1024.;
-    }
-  }
-
-  /** Set max buffered docs, which means we will flush by
-   *  doc count instead of by RAM usage. */
-  void setMaxBufferedDocs(int count) {
-    maxBufferedDocs = count;
-  }
-
   int getMaxBufferedDocs() {
     return maxBufferedDocs;
   }
@@ -359,64 +365,105 @@
     closed = true;
   }
 
-  boolean updateDocument(final Document doc, final Analyzer analyzer, final Term delTerm)
-      throws CorruptIndexException, IOException {
+  boolean updateDocument(final Document doc, final Analyzer analyzer,
+      final Term delTerm) throws CorruptIndexException, IOException {
     ensureOpen();
-
-    FlushedSegment newSegment = null;
-
-    ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
+    boolean maybeMerge = false;
+    if (hijackThreadsForFlush && healtiness.isStalled() ) {
+      /*
+       * if we are allowed to hijack threads for flushing we try to flush out 
+       * as many pending DWPT to release memory and get back healthy status.
+       */
+      if (infoStream != null) {
+        message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment");
+      }
+      final FlushSpecification spec;
+      synchronized (docWriterSession) {
+        spec = getFlushIfPending(null);
+      } // don't push the delete here since the update could fail!
+      maybeMerge = doFlush(spec);
+    } else {
+      if (infoStream != null && healtiness.isStalled()) {
+        message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore");
+      }
+      healtiness.waitIfStalled(); // block if stalled
+    }
+    ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+        this, doc);
+    FlushSpecification flushSpec = null;
     try {
-      DocumentsWriterPerThread dwpt = perThread.perThread;
-      long perThreadRAMUsedBeforeAdd = dwpt.bytesUsed();
-      dwpt.updateDocument(doc, analyzer, delTerm);
+      final DocumentsWriterPerThread dwpt = perThread.perThread;
+      try {
+        dwpt.updateDocument(doc, analyzer, delTerm);
+      } finally {
+        if(dwpt.checkAndResetHasAborted()) {
+          synchronized (docWriterSession) {
+            docWriterSession.doOnAbort(perThread);
+            healtiness.tryUpdateStalled(docWriterSession); 
+          }
+        }
+      }
       numDocsInRAM.incrementAndGet();
-
-      newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
+      synchronized (docWriterSession) {
+        flushPolicy.visit(docWriterSession, perThread);
+        docWriterSession.commitPerThreadBytes(perThread);
+        flushSpec = getFlushIfPending(perThread);
+        healtiness.tryUpdateStalled(docWriterSession); 
+        // TODO: can we check this without lock?
+      }
     } finally {
       perThread.unlock();
     }
-
     // delete term from other DWPTs later, so that this thread
     // doesn't have to lock multiple DWPTs at the same time
     if (delTerm != null) {
       deleteTerm(delTerm, perThread);
     }
-
-    if (newSegment != null) {
-      finishFlushedSegment(newSegment);
+    maybeMerge |= doFlush(flushSpec);
+    return maybeMerge;
+  }
+  
+  private FlushSpecification getFlushIfPending(ThreadState perThread) {
+    if (docWriterSession.hasPendingFlushes()) {
+      if (perThread != null && docWriterSession.isFlushPending(perThread)) {
+        return docWriterSession.tryCheckoutForFlush(perThread, !closed);
+      }
+      return flushPolicy.nextPendingFlush(docWriterSession, !closed);
     }
+    return null;
+  }
 
-    if (newSegment != null) {
-      perThreadPool.clearThreadBindings(perThread);
-      return true;
+  private boolean doFlush(FlushSpecification flushSpec) throws IOException {
+    boolean maybeMerge = false;
+    while (flushSpec != null) {
+      maybeMerge = true;
+      try {
+        // flush concurrently without locking
+        final FlushedSegment newSegment = flushSpec.writer.flush();
+        finishFlushedSegment(newSegment);
+      } finally {
+        synchronized (docWriterSession) {
+          docWriterSession.doAfterFlush(flushSpec);
+          healtiness.tryUpdateStalled(docWriterSession);
+        }
+      }
+      synchronized (docWriterSession) {
+        // make sure we reset to null otherwise we try to flush the same seg
+        // twice
+        flushSpec = docWriterSession.hasPendingFlushes() ? flushPolicy
+            .nextPendingFlush(docWriterSession, !closed) : null;
+      }
     }
+    return maybeMerge;
+  }
+  
 
-    return false;
-    }
-
-  private void finishFlushedSegment(FlushedSegment newSegment) throws IOException {
+  private void finishFlushedSegment(FlushedSegment newSegment)
+      throws IOException {
     pushDeletes(newSegment);
     if (newSegment != null) {
       indexWriter.addFlushedSegment(newSegment);
-  }
-  }
-
-  private final FlushedSegment finishAddDocument(DocumentsWriterPerThread perThread,
-      long perThreadRAMUsedBeforeAdd) throws IOException {
-    FlushedSegment newSegment = null;
-
-    if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
-      newSegment = perThread.flush();
     }
-
-    long deltaRAM = perThread.bytesUsed() - perThreadRAMUsedBeforeAdd;
-    long oldValue = ramUsed.get();
-    while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) {
-      oldValue = ramUsed.get();
-    }
-
-    return newSegment;
   }
 
   final void subtractFlushedNumDocs(int numFlushed) {
@@ -442,7 +489,7 @@
             message("flush: delGen=" + packet.gen);
           }
           }
-      flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
+            flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
           }
         }
 
@@ -464,35 +511,39 @@
       FlushedSegment newSegment = null;
 
       ThreadState perThread = threadsIterator.next();
+      final FlushSpecification flushSpec;
+      final boolean replaceDwpt = !closed;
       perThread.lock();
       try {
-
-        DocumentsWriterPerThread dwpt = perThread.perThread;
+        final DocumentsWriterPerThread dwpt = perThread.perThread; 
         final int numDocs = dwpt.getNumDocsInRAM();
-
         // Always flush docs if there are any
         boolean flushDocs = numDocs > 0;
-
         String segment = dwpt.getSegment();
-
         // If we are flushing docs, segment must not be null:
         assert segment != null || !flushDocs;
 
         if (flushDocs) {
-          newSegment = dwpt.flush();
-
-          if (newSegment != null) {
-            perThreadPool.clearThreadBindings(perThread);
+          synchronized (docWriterSession) {
+            docWriterSession.setFlushPending(perThread);
+            flushSpec = docWriterSession.tryCheckoutForFlush(perThread, replaceDwpt);
+            assert flushSpec != null;
+          } // TODO do concurrently?
+          assert dwpt == flushSpec.writer;
+          try {
+            newSegment = dwpt.flush();
+            anythingFlushed = true;
+            finishFlushedSegment(newSegment);
+          } finally {
+            synchronized (docWriterSession) {
+              docWriterSession.doAfterFlush(flushSpec);
+              healtiness.tryUpdateStalled(docWriterSession);
             }
           }
+        }
       } finally {
         perThread.unlock();
       }
-
-      if (newSegment != null) {
-        anythingFlushed = true;
-        finishFlushedSegment(newSegment);
-      }
     }
 
     if (!anythingFlushed && flushDeletes) {
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java	(working copy)
@@ -31,6 +31,7 @@
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.ByteBlockPool.Allocator;
 import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
 import org.apache.lucene.util.RamUsageEstimator;
 
@@ -128,7 +129,7 @@
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   void abort() throws IOException {
-    aborting = true;
+    hasAborted = aborting = true;
     try {
       if (infoStream != null) {
         message("docWriter: now abort");
@@ -180,10 +181,21 @@
 
     consumer = indexingChain.getChain(this);
     }
+  
+  public DocumentsWriterPerThread(DocumentsWriterPerThread other) {
+    this(other.directory, other.parent, other.fieldInfos, other.parent.chain);
+  }
 
   void setAborting() {
     aborting = true;
   }
+  boolean hasAborted;
+  
+  boolean checkAndResetHasAborted() {
+    boolean retval = hasAborted;
+    hasAborted = false;
+    return retval;
+  }
 
   public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
     assert writer.testPoint("DocumentsWriterPerThread addDocument start");
@@ -203,7 +215,7 @@
     boolean success = false;
     try {
       try {
-      consumer.processDocument(fieldInfos);
+        consumer.processDocument(fieldInfos);
       } finally {
         docState.clear();
       }
@@ -285,7 +297,6 @@
   /** Flush all pending docs to a new segment */
   FlushedSegment flush() throws IOException {
     assert numDocsInRAM > 0;
-
     flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
         numDocsInRAM, writer.getConfig().getTermIndexInterval(),
         SegmentCodecs.build(fieldInfos, writer.codecs), pendingDeletes);
@@ -323,7 +334,7 @@
       newSegment.clearFilesCache();
 
       if (infoStream != null) {
-        message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
+        message("new segment has " + (flushState.deletedDocs == null ? 0 : flushState.deletedDocs.count()) + " deleted docs");
         message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
         message("flushedFiles=" + newSegment.files());
         message("flushed codecs=" + newSegment.getSegmentCodecs());
@@ -395,11 +406,38 @@
     bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
     return b;
   }
+  
+  void recycleIntBlocks(int[][] blocks, int offset, int length) {
+    bytesUsed.addAndGet(-(length *(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT)));
+  }
 
-  final DirectAllocator byteBlockAllocator = new DirectAllocator();
+  final Allocator byteBlockAllocator = new DirectTrackingAllocator();
+    
+    
+ private class DirectTrackingAllocator extends Allocator {
+    public DirectTrackingAllocator() {
+      this(BYTE_BLOCK_SIZE);
+    }
 
+    public DirectTrackingAllocator(int blockSize) {
+      super(blockSize);
+    }
+
+    public byte[] getByteBlock() {
+      bytesUsed.addAndGet(blockSize);
+      return new byte[blockSize];
+    }
+    @Override
+    public void recycleByteBlocks(byte[][] blocks, int start, int end) {
+      bytesUsed.addAndGet(-((end-start)* blockSize));
+      for (int i = start; i < end; i++) {
+        blocks[i] = null;
+      }
+    }
+    
+  };
+
   String toMB(long v) {
     return nf.format(v/1024./1024.);
   }
-
 }
Index: lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java	(working copy)
@@ -7,10 +7,12 @@
 
 public abstract class DocumentsWriterPerThreadPool {
   final static class ThreadState extends ReentrantLock {
-    final DocumentsWriterPerThread perThread;
+    DocumentsWriterPerThread perThread;
+    final int ord;
 
-    ThreadState(DocumentsWriterPerThread perThread) {
+    ThreadState(DocumentsWriterPerThread perThread, int ord) {
       this.perThread = perThread;
+      this.ord = ord;
     }
   }
 
@@ -26,7 +28,7 @@
 
   public void initialize(DocumentsWriter documentsWriter, FieldInfos fieldInfos) {
     for (int i = 0; i < perThreads.length; i++) {
-      perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, fieldInfos, documentsWriter.chain));
+      perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, fieldInfos, documentsWriter.chain), i);
     }
   }
 
@@ -43,7 +45,18 @@
 
     return null;
   }
-
+  
+  protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState) {
+    assert threadState.isHeldByCurrentThread();
+    final DocumentsWriterPerThread dwpt = threadState.perThread;
+    perThreads[threadState.ord].perThread = new DocumentsWriterPerThread(threadState.perThread);
+    clearThreadBindings(threadState);
+    return dwpt;
+  }
+  
+  public void recycle(DocumentsWriterPerThread dwpt) {
+  }
+  
   public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
 
   public abstract void clearThreadBindings(ThreadState perThread);
Index: lucene/src/java/org/apache/lucene/index/FlushPolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/FlushPolicy.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/FlushPolicy.java	(revision 0)
@@ -0,0 +1,87 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.PrintStream;
+import java.util.Iterator;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.util.SetOnce;
+
+public abstract class FlushPolicy {
+
+  protected final SetOnce<DocumentsWriter> docsWriter = new SetOnce<DocumentsWriter>();
+  protected PrintStream infoStream;
+  protected int maxDocsPerWriter;
+
+  public abstract void visit(DocWriterSession session,
+      ThreadState state);
+
+  public FlushSpecification nextPendingFlush(DocWriterSession session, boolean doReplace) {
+    Iterator<ThreadState> allActiveThreads = session.allActiveThreads();
+    while (allActiveThreads.hasNext()) {
+      ThreadState next = allActiveThreads.next();
+      if (session.isFlushPending(next)) {
+        assert message(Thread.currentThread().getName() + ": checkout for flush " + next.ord + " numDocs: " + next.perThread.getNumDocsInRAM() + " dwpt: "+ next.perThread + " doReplace: "+ doReplace);
+        FlushSpecification spec = session.tryCheckoutForFlush(next, doReplace);
+        if (spec != null) {
+          return spec;
+        }
+      }
+    }
+    return null;
+  }
+  public abstract boolean isStalled(DocWriterSession session, boolean isStalled);
+  
+  public boolean hijackThreadsForFlush() {
+    return true;
+  }
+
+  protected boolean message(String msg) {
+    if (infoStream != null) {
+      infoStream.println(msg);
+    }
+    return true;
+  }
+
+  protected void init(DocumentsWriter docsWriter) {
+    this.docsWriter.set(docsWriter);
+    this.infoStream = docsWriter.infoStream;
+    // set int max to disable flush by # docs - TODO improve this
+    maxDocsPerWriter = docsWriter.maxBufferedDocs;
+  }
+
+  public static class FlushSpecification {
+    public final DocumentsWriterPerThread writer;
+    public final long ramBytesUsed;
+
+    @Override
+    public String toString() {
+      return "FlushSpecification [writer=" + writer + ", ramBytesUsed="
+          + ramBytesUsed + "]";
+    }
+
+    public FlushSpecification(DocumentsWriterPerThread writer, long flushBytes) {
+      this.writer = writer;
+      ramBytesUsed = flushBytes;
+    }
+  }
+
+  final void setInfoStream(PrintStream infoStream) {
+    this.infoStream = infoStream;
+  }
+}

Property changes on: lucene/src/java/org/apache/lucene/index/FlushPolicy.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/Healthiness.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/Healthiness.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/Healthiness.java	(revision 0)
@@ -0,0 +1,94 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+class Healthiness {
+
+  private static final class Sync extends AbstractQueuedSynchronizer {
+    volatile boolean hasBlockedThreads = false; // only with assert
+
+    Sync() {
+      setState(0);
+    }
+
+    public boolean isZero() {
+      return getState() == 0;
+    }
+
+    public boolean enqueueUnhaltiness() {
+      int state = getState();
+      return compareAndSetState(state, state + 1);
+    }
+
+    public boolean tryHealtyReset() {
+      final int oldState = getState();
+      if (oldState == 0)
+        return true;
+      if (compareAndSetState(oldState, 0)) {
+        releaseShared(0);
+        return true;
+      }
+      return false;
+    }
+
+    public int tryAcquireShared(int acquires) {
+      assert (hasBlockedThreads = getState() != 0) || true;
+      return getState() == 0 ? 1 : -1;
+    }
+
+    public boolean tryReleaseShared(int newState) {
+      return (getState() == 0);
+    }
+  }
+
+  private final Healthiness.Sync sync = new Sync();
+  private final FlushPolicy policy;
+  volatile boolean wasStalled = false; // only with asserts
+
+  public Healthiness(FlushPolicy policy) {
+    this.policy = policy;
+  }
+
+  public boolean isStalled() {
+    return !sync.isZero();
+  }
+
+  public void tryUpdateStalled(DocWriterSession session) {
+    do {
+      /*
+       * TODO maybe we need to add some safety check here if pendingMem > 0 and
+       * flushingMem == 0 to prevent thread starvation
+       */
+      while (policy.isStalled(session, !sync.isZero())) {
+        if (sync.enqueueUnhaltiness()) {
+          assert wasStalled = true;
+          return;
+        }
+      }
+    } while (!sync.tryHealtyReset());
+  }
+
+  public void waitIfStalled() {
+    sync.acquireShared(0);
+  }
+
+  boolean hasBlocked() {
+    return sync.hasBlockedThreads;
+  }
+}
\ No newline at end of file

Property changes on: lucene/src/java/org/apache/lucene/index/Healthiness.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java	(working copy)
@@ -817,8 +817,6 @@
         segmentInfos.changed();
       }
 
-      docWriter.setRAMBufferSizeMB(conf.getRAMBufferSizeMB());
-      docWriter.setMaxBufferedDocs(conf.getMaxBufferedDocs());
       pushMaxBufferedDocs();
 
       if (infoStream != null) {
@@ -2700,9 +2698,18 @@
   public final long ramSizeInBytes() {
     ensureOpen();
     // nocommit
-    //return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
-    return 0;
+    final long netBytes;
+    synchronized (docWriter.docWriterSession) {
+      netBytes = docWriter.docWriterSession.netBytes();
+    }
+    return netBytes + bufferedDeletesStream.bytesUsed();
   }
+  
+  DocumentsWriter getDocsWriter() {
+    boolean test = false;
+    assert test = true;
+    return test?docWriter: null;
+  }
 
   /** Expert:  Return the number of documents currently
    *  buffered in RAM. */
Index: lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java	(working copy)
@@ -126,6 +126,7 @@
   private boolean readerPooling;
   private DocumentsWriterPerThreadPool indexerThreadPool;
   private int readerTermsIndexDivisor;
+  private FlushPolicy flushPolicy;
 
   // required for clone
   private Version matchVersion;
@@ -572,6 +573,15 @@
   public int getReaderTermsIndexDivisor() {
     return readerTermsIndexDivisor;
   }
+  
+  public IndexWriterConfig setFlushPolicy(FlushPolicy flushPolicy) {
+    this.flushPolicy = flushPolicy;
+    return this;
+  }
+  
+  public FlushPolicy getFlushPolicy() {
+    return flushPolicy;
+  }
 
   @Override
   public String toString() {
@@ -596,6 +606,9 @@
     sb.append("maxThreadStates=").append(indexerThreadPool.getMaxThreadStates()).append("\n");
     sb.append("readerPooling=").append(readerPooling).append("\n");
     sb.append("readerTermsIndexDivisor=").append(readerTermsIndexDivisor).append("\n");
+    sb.append("flushPolicy=").append(flushPolicy).append("\n");
+
     return sb.toString();
   }
+
 }
Index: lucene/src/java/org/apache/lucene/index/IntBlockPool.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IntBlockPool.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/IntBlockPool.java	(working copy)
@@ -1,5 +1,7 @@
 package org.apache.lucene.index;
 
+import java.util.Arrays;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -36,6 +38,10 @@
   public void reset() {
     if (bufferUpto != -1) {
       // Reuse first buffer
+      if (bufferUpto > 0) {
+        docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
+        Arrays.fill(buffers, 1, bufferUpto, null);
+      }
       bufferUpto = 0;
       intUpto = 0;
       intOffset = 0;
Index: lucene/src/java/org/apache/lucene/index/MultiTierFlushPolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/MultiTierFlushPolicy.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/MultiTierFlushPolicy.java	(revision 0)
@@ -0,0 +1,137 @@
+package org.apache.lucene.index;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class MultiTierFlushPolicy extends ByRAMFlushPolicyBase {
+
+  static double HIGH_WATER_PERCENT = 1.10;
+  static double LOW_WATER_PERCENT = 0.90;
+  long[] tiers;
+  int tierPointer = 0;
+  long watermark;
+  long maxNetBytes;
+
+  @Override
+  protected void init(DocumentsWriter docsWriter) {
+    super.init(docsWriter);
+    assert docsWriter.ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+    final int numThreads = docsWriter.perThreadPool.getMaxThreadStates();
+    tiers = getTiers(maxRamBytes, numThreads);
+    watermark = tiers[tierPointer];
+    maxNetBytes = 2 * maxRamBytes;
+  }
+
+  @Override
+  public void visit(DocWriterSession session, ThreadState state) {
+    if (session.isFlushPending(state)) {
+      return;
+    }
+    if (state.perThread.getNumDocsInRAM() >= maxDocsPerWriter) {
+      session.setFlushPending(state);
+    } else {
+      final long lastBytesPerThread = session.getPerThreadBytes(state);
+      final long delta = state.perThread.bytesUsed() - lastBytesPerThread;
+      final long totalRam = session.activeBytes() + delta;
+      if (totalRam >= watermark) {
+        if (++tierPointer == tiers.length) {
+          assert message("Set flush pending on all writers at level: "
+              + (tierPointer - 1) + " tierMaxMemory: " + watermark
+              + " uncommitted active mem: " + totalRam);
+          /*
+           * Reset watermark to tier 0 and let all threads continue with
+           * flushing, let replaced DWPT continue indexing until we are not
+           * healthy anymore. Once the flushed bytes are clearing up the
+           * healthiness control will prevent threads from bloating the memory
+           * until enough ram is flushed.
+           */
+          watermark = tiers[tierPointer = 0];
+          markAllWritersPending(session, state);
+        } else {
+          assert message("Set flush pending on largest writer at level: "
+              + (tierPointer - 1) + " tierMaxMemory: " + watermark
+              + " uncommitted active mem: " + totalRam);
+          watermark = tiers[tierPointer];
+          markLargesWriterPending(session, state,
+              lastBytesPerThread + delta);
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the number of bytes for each tier to flush at.
+   */
+  protected long[] getTiers(long maxRamBytes, int numThreads) {
+    assert numThreads > 0;
+    if (numThreads == 1) {
+      return new long[] { maxRamBytes };
+    }
+    final long lowWater = getWaterMarkBytes(maxRamBytes, LOW_WATER_PERCENT);
+    final long highWater = getWaterMarkBytes(maxRamBytes, HIGH_WATER_PERCENT);
+    long[] tiers = new long[numThreads];
+    tiers[0] = lowWater;
+    long inc = (highWater - lowWater) / (numThreads - 1);
+    long cur = lowWater;
+    for (int x = 1; x < tiers.length; x++) {
+      cur += inc;
+      tiers[x] = cur;
+    }
+    return tiers;
+  }
+
+  /**
+   * Returns the water mark for the given percentage.
+   */
+  private static long getWaterMarkBytes(long ramMax, double percent) {
+    return (long) ((double) ramMax * percent);
+  }
+
+  @Override
+  public boolean isStalled(DocWriterSession session, boolean isStalled) {
+    if (session.netBytes() >= maxNetBytes) {
+      return true;
+    } else if (isStalled && (maxFlushBytes < session.flushBytes())) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public long getUpperWatermark() {
+    return tiers[tiers.length - 1];
+  }
+
+  /**
+   * @return the watermark
+   */
+  @Override
+  public long getWatermark() {
+    return watermark;
+  }
+
+  /**
+   * @return the maxNetBytes
+   */
+  @Override
+  public long getMaxNetBytes() {
+    return maxNetBytes;
+  }
+
+}
\ No newline at end of file

Property changes on: lucene/src/java/org/apache/lucene/index/MultiTierFlushPolicy.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/SingleTierFlushPolicy.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SingleTierFlushPolicy.java	(revision 0)
+++ lucene/src/java/org/apache/lucene/index/SingleTierFlushPolicy.java	(revision 0)
@@ -0,0 +1,82 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+//nocommit - document this
+public class SingleTierFlushPolicy extends ByRAMFlushPolicyBase {
+  static double HIGH_WATER_PERCENT = 1.10;
+  static double LOW_WATER_PERCENT = 0.90;
+  private long lowWater;
+  private long highWater;
+
+  @Override
+  protected void init(DocumentsWriter docsWriter) {
+    super.init(docsWriter);
+    lowWater = getWaterMarkBytes(maxRamBytes, LOW_WATER_PERCENT);
+    highWater = getWaterMarkBytes(maxRamBytes, HIGH_WATER_PERCENT);
+  }
+
+  /**
+   * Returns the water mark for the given percentage.
+   */
+  private static long getWaterMarkBytes(long ramMax, double percent) {
+    return (long) ((double) ramMax * percent);
+  }
+
+  @Override
+  public long getUpperWatermark() {
+    return highWater;
+  }
+
+  @Override
+  public long getWatermark() {
+    return lowWater;
+  }
+
+  @Override
+  public void visit(DocWriterSession session, ThreadState state) {
+    if (session.isFlushPending(state)) {
+      return;
+    }
+    if (state.perThread.getNumDocsInRAM() >= maxDocsPerWriter) {
+      session.setFlushPending(state); // flush by num docs
+    } else {
+      final long lastBytesPerThread = session.getPerThreadBytes(state);
+      final long delta = state.perThread.bytesUsed() - lastBytesPerThread;
+      final long totalRam = session.activeBytes() + delta;
+      if (totalRam >= highWater) {
+        assert message("Set flush pending on all writers uncommitted active mem: "
+            + totalRam);
+        markAllWritersPending(session, state);
+      } else if (totalRam >= lowWater) {
+        markLargesWriterPending(session, state, lastBytesPerThread + delta);
+      }
+    }
+  }
+
+  @Override
+  public boolean isStalled(DocWriterSession session, boolean isStalled) {
+    return session.activeBytes() + session.flushPendingBytes() + session.flushBytes() > highWater;
+  }
+
+  public long getMaxNetBytes() {
+    return highWater;
+  }
+
+}

Property changes on: lucene/src/java/org/apache/lucene/index/SingleTierFlushPolicy.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

Index: lucene/src/java/org/apache/lucene/index/TermsHash.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/TermsHash.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/TermsHash.java	(working copy)
@@ -52,7 +52,8 @@
   // Used by perField to obtain terms from the analysis chain
   final BytesRef termBytesRef = new BytesRef(10);
 
-  boolean trackAllocations;
+  //nocommit - make configurable eg. if flush by # docs
+  boolean trackAllocations=true;
 
 
   public TermsHash(final DocumentsWriterPerThread docWriter, final TermsHashConsumer consumer, final TermsHash nextTermsHash) {
Index: lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/TermsHashPerField.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/TermsHashPerField.java	(working copy)
@@ -312,7 +312,7 @@
     @Override
     public int[] clear() {
       if(perField.postingsArray != null) {
-        bytesUsed.addAndGet(-perField.postingsArray.size * perField.postingsArray.bytesPerPosting());
+        bytesUsed.addAndGet(-(perField.postingsArray.size * perField.postingsArray.bytesPerPosting()));
         perField.postingsArray = null;
       }
       return null;
Index: lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java	(revision 1078697)
+++ lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java	(working copy)
@@ -11,6 +11,7 @@
 
   public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
     super(maxNumPerThreads);
+    assert getMaxThreadStates() >= 1;
   }
 
   @Override
@@ -21,25 +22,26 @@
         return threadState;
       }
     }
-
-    // find the state that has minimum amount of threads waiting
-    Iterator<ThreadState> it = getActivePerThreadsIterator();
     ThreadState minThreadState = null;
-    while (it.hasNext()) {
-      ThreadState state = it.next();
-      if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
-        minThreadState = state;
+    do {
+      // find the state that has minimum amount of threads waiting
+      Iterator<ThreadState> it = getActivePerThreadsIterator();
+      while (it.hasNext()) {
+        ThreadState state = it.next();
+        if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
+          minThreadState = state;
+        }
       }
-    }
-
-    if (minThreadState == null || minThreadState.hasQueuedThreads()) {
-      ThreadState newState = newThreadState();
-      if (newState != null) {
-        minThreadState = newState;
-        threadBindings.put(requestingThread, newState);
+  
+      if (minThreadState == null || minThreadState.hasQueuedThreads()) {
+        ThreadState newState = newThreadState();
+        if (newState != null) {
+          minThreadState = newState;
+          threadBindings.put(requestingThread, newState);
+        }
       }
-    }
-
+    } while(minThreadState == null);
+    
     minThreadState.lock();
     return minThreadState;
   }
@@ -53,4 +55,5 @@
   public void clearAllThreadBindings() {
     threadBindings.clear();
   }
+  
 }
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java	(revision 1078697)
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java	(working copy)
@@ -70,7 +70,9 @@
     assertEquals(IndexWriterConfig.DEFAULT_READER_TERMS_INDEX_DIVISOR, conf.getReaderTermsIndexDivisor());
     assertEquals(LogByteSizeMergePolicy.class, conf.getMergePolicy().getClass());
     assertEquals(ThreadAffinityDocumentsWriterThreadPool.class, conf.getIndexerThreadPool().getClass());
+    assertNull(conf.getFlushPolicy());
 
+
     // Sanity check - validate that all getters are covered.
     Set<String> getters = new HashSet<String>();
     getters.add("getAnalyzer");
@@ -94,6 +96,7 @@
     getters.add("getReaderPooling");
     getters.add("getIndexerThreadPool");
     getters.add("getReaderTermsIndexDivisor");
+    getters.add("getFlushPolicy");
     for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
       if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {
         assertTrue("method " + m.getName() + " is not tested for defaults", getters.contains(m.getName()));
Index: lucene/src/test/org/apache/lucene/index/TestTieredFlushPolicy.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestTieredFlushPolicy.java	(revision 0)
+++ lucene/src/test/org/apache/lucene/index/TestTieredFlushPolicy.java	(revision 0)
@@ -0,0 +1,408 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.junit.Before;
+
+import sun.security.action.GetLongAction;
+
+// TODO: implement a simulated disk-like delay for the flushes
+public class TestTieredFlushPolicy extends LuceneTestCase {
+
+  private LineFileDocs lineDocFile;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    lineDocFile = new LineFileDocs(random);
+  }
+  
+  public void runByRamFlushPolicy(ByRAMFlushPolicyBase policy, int numThreads) throws CorruptIndexException, LockObtainFailedException, IOException, InterruptedException {
+    final int numDocumentsToIndex = 100 + random.nextInt(300);
+      AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+      Directory dir = newDirectory();
+      ByRAMFlushPolicyBase flushPolicy = new MockMultiTieredFlushPolicy();
+      IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+          new MockAnalyzer()).setFlushPolicy(flushPolicy);
+
+      final int numDWPT = 1 + random.nextInt(8);
+      DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+          numDWPT);
+      iwc.setIndexerThreadPool(threadPool);
+      iwc.setRAMBufferSizeMB(1.0 + Math.ceil(random.nextDouble()));
+      iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+      IndexWriter writer = new IndexWriter(dir, iwc);
+      DocumentsWriter docsWriter = writer.getDocsWriter();
+      assertNotNull(docsWriter);
+      DocWriterSession session = docsWriter.docWriterSession;
+      assertEquals(" bytes must be 0 after init", 0, session.flushBytes());
+      assertEquals(" bytes must be 0 after init", 0, session.flushPendingBytes());
+
+      IndexThread[] threads = new IndexThread[numThreads];
+      for (int x = 0; x < threads.length; x++) {
+        threads[x] = new IndexThread(numDocs, numThreads, writer,
+            lineDocFile, false);
+        threads[x].start();
+      }
+
+      for (int x = 0; x < threads.length; x++) {
+        threads[x].join();
+      }
+
+      assertEquals(" all flushes must be due numThreads=" + numThreads, 0, session.flushBytes());
+      assertEquals(" all flushes must be due numThreads=" + numThreads, 0, session.flushPendingBytes());
+      assertEquals(numDocumentsToIndex, writer.numDocs());
+      assertEquals(numDocumentsToIndex, writer.maxDoc());
+      assertTrue("peak bytes exceeded upper watermark",
+          session.peakActiveBytes <= flushPolicy.getUpperWatermark());
+      assertActiveBytesAfter(session);
+      writer.close();
+      dir.close();
+  }
+  public void testMultiTieredFlush() throws IOException, InterruptedException {
+    int[] numThreads = new int[] { 3 + random.nextInt(8), 1 };
+    for (int i = 0; i < numThreads.length; i++) {
+      runByRamFlushPolicy(new MockMultiTieredFlushPolicy(), numThreads[i]);
+    }
+  }
+  
+  public void testSingleTieredFlush() throws IOException, InterruptedException {
+    int[] numThreads = new int[] { 3 + random.nextInt(8), 1 };
+    for (int i = 0; i < numThreads.length; i++) {
+      runByRamFlushPolicy(new MockSingleTieredFlushPolicy(), numThreads[i]);
+    }
+  }
+  
+  public void testMultTierFlushSetup() throws IOException {
+    Directory dir = newDirectory();
+    MultiTierFlushPolicy flushPolicy = new MultiTierFlushPolicy();
+    IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+        new MockAnalyzer()).setFlushPolicy(flushPolicy);
+
+    final int numDWPT = 1+ random.nextInt(10);
+    DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+        numDWPT);
+    iwc.setIndexerThreadPool(threadPool);
+    iwc.setRAMBufferSizeMB(1.0 + Math.ceil(random.nextDouble()));
+    iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+    long[] tiers = flushPolicy.getTiers(
+        (long) (iwc.getRAMBufferSizeMB() * 1024.0d * 1024.0d), numDWPT);
+    IndexWriter writer = new IndexWriter(dir, iwc);
+    assertArrayEquals(tiers, flushPolicy.tiers);
+    assertEquals(0, flushPolicy.tierPointer);
+    assertEquals(tiers[0], flushPolicy.watermark);
+    
+    writer.close();
+    dir.close();
+  }
+
+  public static FlushPolicy newFlushPolicy(Random random, boolean flushDocCountOnly) {
+    if (flushDocCountOnly) {
+      return new DocCountFlushPolicy();
+    } else {
+      if (random.nextBoolean()) {
+        return new MockSingleTieredFlushPolicy();
+      } else {
+        return new MockMultiTieredFlushPolicy();
+      }
+    }
+  }
+
+  public void testRandom() throws IOException, InterruptedException {
+    final int numThreads = 1 + random.nextInt(8);
+    final int numDocumentsToIndex = 100 + random.nextInt(300);
+    AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+    Directory dir = newDirectory();
+    IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+        new MockAnalyzer());
+    FlushPolicy flushPolicy = newFlushPolicy(random, iwc.getRAMBufferSizeMB() == IndexWriterConfig.DISABLE_AUTO_FLUSH);
+    iwc.setFlushPolicy(flushPolicy);
+
+    final int numDWPT = 1 + random.nextInt(8);
+    DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+        numDWPT);
+    iwc.setIndexerThreadPool(threadPool);
+
+    IndexWriter writer = new IndexWriter(dir, iwc);
+    DocumentsWriter docsWriter = writer.getDocsWriter();
+    assertNotNull(docsWriter);
+    DocWriterSession session = docsWriter.docWriterSession;
+   
+    assertEquals(" bytes must be 0 after init", 0, session.flushBytes());
+    assertEquals(" bytes must be 0 after init", 0, session.flushPendingBytes());
+
+    IndexThread[] threads = new IndexThread[numThreads];
+    for (int x = 0; x < threads.length; x++) {
+      threads[x] = new IndexThread(numDocs, numThreads, writer, lineDocFile, true);
+      threads[x].start();
+    }
+
+    for (int x = 0; x < threads.length; x++) {
+      threads[x].join();
+    }
+
+    assertEquals(" all flushes must be due", 0, session.flushBytes());
+    assertEquals(" all flushes must be due", 0, session.flushPendingBytes());
+    assertEquals(numDocumentsToIndex, writer.numDocs());
+    assertEquals(numDocumentsToIndex, writer.maxDoc());
+    if (flushPolicy instanceof ByRAMFlushPolicyBase) {
+    assertTrue("peak bytes exceeded upper watermark",
+        session.peakActiveBytes <= ((ByRAMFlushPolicyBase)flushPolicy).getUpperWatermark());
+    }
+    assertActiveBytesAfter(session);
+    writer.commit();
+    assertEquals(0, session.activeBytes());
+    IndexReader r = IndexReader.open(dir);
+    assertEquals(numDocumentsToIndex, r.numDocs());
+    assertEquals(numDocumentsToIndex, r.maxDoc());
+    r.close();
+    writer.close();
+    dir.close();
+  }
+
+  public void testHealthyness() throws InterruptedException,
+      CorruptIndexException, LockObtainFailedException, IOException {
+
+    int[] numThreads = new int[] { 3 + random.nextInt(8), 1 };
+    final int numDocumentsToIndex = 100 + random.nextInt(300);
+    for (int i = 0; i < numThreads.length; i++) {
+      AtomicInteger numDocs = new AtomicInteger(numDocumentsToIndex);
+      Directory dir = newDirectory();
+      IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
+          new MockAnalyzer());
+      iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
+      ByRAMFlushPolicyBase flushPolicy = (ByRAMFlushPolicyBase) newFlushPolicy(random, false);
+      iwc.setFlushPolicy(flushPolicy);
+
+      DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+          numThreads[i]);
+      iwc.setIndexerThreadPool(threadPool);
+      iwc.setRAMBufferSizeMB(0.5);
+      IndexWriter writer = new IndexWriter(dir, iwc);
+      IndexThread[] threads = new IndexThread[numThreads[i]];
+      for (int x = 0; x < threads.length; x++) {
+        threads[x] = new IndexThread(numDocs, numThreads[i], writer,
+            lineDocFile, false);
+        threads[x].start();
+      }
+
+      for (int x = 0; x < threads.length; x++) {
+        threads[x].join();
+      }
+      DocumentsWriter docsWriter = writer.getDocsWriter();
+      assertNotNull(docsWriter);
+      DocWriterSession session = docsWriter.docWriterSession;
+      assertEquals(" all flushes must be due", 0, session.flushBytes());
+      assertEquals(" all flushes must be due", 0, session.flushPendingBytes());
+      assertEquals(numDocumentsToIndex, writer.numDocs());
+      assertEquals(numDocumentsToIndex, writer.maxDoc());
+
+      assertTrue("peak bytes exceeded upper watermark",
+          session.peakActiveBytes <= flushPolicy.getUpperWatermark());
+      if (numThreads[i] == 1) { // single thread could be unhealthy is a singel
+                                // doc is very large?!
+        long maxDelta = flushPolicy.getMaxNetBytes()
+            - flushPolicy.getUpperWatermark();
+        if (session.peakDelta < maxDelta) {
+          assertFalse("single thread must not be unhealthy",
+              docsWriter.healtiness.wasStalled);
+        }
+        assertFalse("single thread must not block",
+            docsWriter.healtiness.hasBlocked());
+      } else if (session.peakNetBytes >= flushPolicy.getMaxNetBytes()) {
+        assertTrue(" was not unhealthy but peak net bytes exceeded",
+            docsWriter.healtiness.wasStalled);
+      }
+      assertActiveBytesAfter(session);
+      writer.close();
+      dir.close();
+    }
+
+  }
+
+  protected void assertActiveBytesAfter(DocWriterSession session) {
+    Iterator<ThreadState> allActiveThreads = session.allActiveThreads();
+    long bytesUsed = 0;
+    while (allActiveThreads.hasNext()) {
+      bytesUsed += allActiveThreads.next().perThread.bytesUsed();
+    }
+    assertEquals(bytesUsed, session.activeBytes());
+  }
+
+  public class IndexThread extends Thread {
+    int numThreads;
+    IndexWriter writer;
+    IndexWriterConfig iwc;
+    LineFileDocs docs;
+    private AtomicInteger pendingDocs;
+    private final boolean doRandomCommit;
+
+    public IndexThread(AtomicInteger pendingDocs, int numThreads,
+        IndexWriter writer, LineFileDocs docs, boolean doRandomCommit) {
+      this.pendingDocs = pendingDocs;
+      this.numThreads = numThreads;
+      this.writer = writer;
+      iwc = writer.getConfig();
+      this.docs = docs;
+      this.doRandomCommit = doRandomCommit;
+    }
+
+    public void run() {
+      try {
+        long ramSize = 0;
+        while (pendingDocs.decrementAndGet() > -1) {
+          Document doc = docs.nextDoc();
+          writer.addDocument(doc);
+          long newRamSize = writer.ramSizeInBytes();
+          if (newRamSize != ramSize) {
+            ramSize = newRamSize;
+          }
+          if (doRandomCommit) {
+            int commit;
+            synchronized (random) {
+              commit = random.nextInt(20);
+            }
+            if (commit == 0) {
+              writer.commit();
+            }
+          }
+        }
+      } catch (Exception ex) {
+        ex.printStackTrace();
+      }
+    }
+  }
+
+  private static class MockMultiTieredFlushPolicy extends MultiTierFlushPolicy {
+   
+    @Override
+    public void visit(DocWriterSession session, ThreadState state) {
+      long currentBytes = state.perThread.bytesUsed();
+      long previousBytes = session.getPerThreadBytes(state);
+      long delta = currentBytes - previousBytes;
+      int expectedTier = tierPointer;
+      boolean expectFlushCurrent = session.isFlushPending(state);
+      ArrayList<ThreadState> mustFlush = new ArrayList<ThreadState>();
+      ArrayList<ThreadState> mustNotFlush = new ArrayList<ThreadState>();
+      // only trigger watermark if we are not pending
+      if (!expectFlushCurrent && (session.activeBytes() + delta) >= watermark) {
+        expectedTier = tierPointer == tiers.length - 1 ? 0 : expectedTier + 1;
+        if (expectedTier == 0) {
+          expectFlushCurrent = true; // flush all
+          findOnFlushAll(session, mustFlush, mustNotFlush);
+        } else { // flush biggest
+          ThreadState toFlush = findOnFlushSingle(session, state, currentBytes);
+          expectFlushCurrent |= toFlush == state;
+        }
+      }
+      super.visit(session, state);
+      for (ThreadState threadState : mustNotFlush) {
+        assertFalse("theadState: " + threadState.ord
+            + " must NOT have flush pending",
+            session.isFlushPending(threadState));
+      }
+      for (ThreadState threadState : mustFlush) {
+        assertTrue("theadState: " + threadState.ord
+            + " must have flush pending", session.isFlushPending(threadState));
+      }
+      assertEquals("expected tier doesn't match", expectedTier, tierPointer);
+    }
+
+   
+  }
+  
+  private static class MockSingleTieredFlushPolicy extends SingleTierFlushPolicy {
+    
+    @Override
+    public void visit(DocWriterSession session, ThreadState state) {
+      long currentBytes = state.perThread.bytesUsed();
+      long previousBytes = session.getPerThreadBytes(state);
+      long delta = currentBytes - previousBytes;
+      boolean expectFlushCurrent = session.isFlushPending(state);
+      ArrayList<ThreadState> mustFlush = new ArrayList<ThreadState>();
+      ArrayList<ThreadState> mustNotFlush = new ArrayList<ThreadState>();
+      // only trigger watermark if we are not pending
+      if (!expectFlushCurrent) {
+     
+        if ((session.activeBytes() + delta) >= getUpperWatermark()) {
+          expectFlushCurrent = true; // flush all
+          findOnFlushAll(session, mustFlush, mustNotFlush);
+        } else if ((session.activeBytes() + delta) >= getWatermark()){ // flush biggest
+          ThreadState toFlush = findOnFlushSingle(session, state, currentBytes);
+          expectFlushCurrent |= toFlush == state;
+        }
+      }
+      super.visit(session, state);
+      for (ThreadState threadState : mustNotFlush) {
+        assertFalse("theadState: " + threadState.ord
+            + " must NOT have flush pending",
+            session.isFlushPending(threadState));
+      }
+      for (ThreadState threadState : mustFlush) {
+        assertTrue("theadState: " + threadState.ord
+            + " must have flush pending", session.isFlushPending(threadState));
+      }
+    }
+  }
+  
+   static ThreadState findOnFlushSingle(DocWriterSession session,
+      ThreadState state, long currentBytes) {
+    long maxMem = currentBytes;
+    ThreadState maxState = state;
+    Iterator<ThreadState> allActiveThreads = session.allActiveThreads();
+    while (allActiveThreads.hasNext()) {
+      ThreadState next = allActiveThreads.next();
+      long perThreadBytes = session.getPerThreadBytes(next);
+      if (next.perThread.getNumDocsInRAM() > 0) {
+        // never flush empty
+        if (perThreadBytes > maxMem) {
+          maxMem = perThreadBytes;
+          maxState = next;
+        }
+      }
+    }
+    return maxState;
+  }
+
+  static void findOnFlushAll(DocWriterSession session,
+      ArrayList<ThreadState> mustFlush, ArrayList<ThreadState> mustNotFlush) {
+    Iterator<ThreadState> allActiveThreads = session.allActiveThreads();
+    while (allActiveThreads.hasNext()) {
+      ThreadState next = allActiveThreads.next();
+      if (next.perThread.getNumDocsInRAM() > 0) {
+        mustFlush.add(next);
+      } else {
+        mustNotFlush.add(next);
+      }
+    }
+  }
+}

Property changes on: lucene/src/test/org/apache/lucene/index/TestTieredFlushPolicy.java
___________________________________________________________________
Added: svn:keywords
   + Date Author Id Revision HeadURL
Added: svn:eol-style
   + native

