diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 41a6378..c2ada21 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -85,6 +85,9 @@ API Changes
 
 * LUCENE-4520: ValueSource.getSortField no longer throws IOExceptions
   (Alan Woodward)
+  
+* LUCENE-4537: Directory now accepts a RateLimiter per IOContext to limit
+  IndexOutput I/O operations even beyond merge I/O. (Simon Willnauer) 
 
 Bug Fixes
 
diff --git a/lucene/core/src/java/org/apache/lucene/store/Directory.java b/lucene/core/src/java/org/apache/lucene/store/Directory.java
index 4be172e..24c835d 100644
--- a/lucene/core/src/java/org/apache/lucene/store/Directory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/Directory.java
@@ -22,7 +22,9 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Closeable;
 import java.util.Collection; // for javadocs
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.lucene.store.IOContext.Context;
 import org.apache.lucene.util.IOUtils;
 
 /** A Directory is a flat list of files.  Files may be written once, when they
@@ -49,6 +51,9 @@ public abstract class Directory implements Closeable {
    * this Directory instance). */
   protected LockFactory lockFactory;
 
+  private final ConcurrentHashMap<IOContext.Context, RateLimiter> contextRateLimiter = 
+     new ConcurrentHashMap<IOContext.Context,RateLimiter>();
+
   /**
    * Returns an array of strings, one for each file in the directory.
    * 
@@ -251,6 +256,58 @@ public abstract class Directory implements Closeable {
       throw new AlreadyClosedException("this Directory is closed");
   }
   
+  protected RateLimiter getRateLimiter(IOContext.Context context) {
+    return contextRateLimiter.get(context);
+  }
+
+  /** Sets the maximum (approx) MB/sec allowed by all write
+   *  IO performed by {@link IndexOutput} created with the given {@link IOContext.Context}.
+   *  Pass <code>null</code> to have no limit.
+   *
+   *  <p><b>NOTE</b>: For already created {@link IndexOutput} instances there is
+   *  no guarantee this new rate will apply to them; it will
+   *  only be guaranteed to apply for new created {@link IndexOutput} instances.
+   *  <p><b>NOTE</b>: this is an optional operation and might not be respected by all
+   *  Directory implementations. Currently only {@link FSDirectory buffered} Directory 
+   *  implementations use rate-limiting.
+   *
+   * @lucene.experimental */
+  public void setMaxWriteMBPerSec(Double mbPerSec, IOContext.Context context) {
+    RateLimiter limiter = getRateLimiter(context);
+    if (mbPerSec == null) {
+      if (limiter != null) {
+        limiter.setMbPerSec(Double.MAX_VALUE);
+        contextRateLimiter.put(context, null);
+      }
+    } else if (limiter != null) {
+      limiter.setMbPerSec(mbPerSec);
+    } else {
+      contextRateLimiter.put(context, new RateLimiter.SimpleRateLimiter(mbPerSec));
+    }
+  }
+
+  /**
+   * Sets the rate limiter to be used to limit (approx) MB/sec allowed
+   * by all IO performed with the given {@link Context context}. 
+   * Pass <code>null</code> to have no limit.
+   *
+   * <p>Passing an instance of rate limiter compared to setting it using
+   * {@link #setMaxWriteMBPerSec(Double, org.apache.lucene.store.IOContext.Context)} allows to use the same limiter
+   * instance across several directories globally limiting IO across them.
+   *
+   * @lucene.experimental */
+  public void setMaxWriteLimiter(RateLimiter mergeWriteRateLimiter, Context context) {
+    contextRateLimiter.put(context,mergeWriteRateLimiter);
+  }
+
+  /** 
+   * See {@link #setMaxWriteMBPerSec}.
+   * @lucene.experimental */
+  public Double getMaxWriteMBPerSec(IOContext.Context context) {
+    RateLimiter limiter = getRateLimiter(context);
+    return limiter == null ? null : limiter.getMbPerSec();
+  }
+
   /**
    * Allows to create one or more sliced {@link IndexInput} instances from a single 
    * file handle. Some {@link Directory} implementations may be able to efficiently map slices of a file
@@ -326,4 +383,5 @@ public abstract class Directory implements Closeable {
       return length;
     }
   }
+  
 }
diff --git a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
index 770f403..6427583 100644
--- a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
@@ -123,9 +123,6 @@ public abstract class FSDirectory extends Directory {
   protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
   private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
 
-  // null means no limit
-  private volatile RateLimiter mergeWriteRateLimiter;
-
   // returns the canonical version of the directory, creating it if it doesn't exist.
   private static File getCanonicalPath(File file) throws IOException {
     return new File(file.getCanonicalPath());
@@ -286,51 +283,7 @@ public abstract class FSDirectory extends Directory {
     ensureOpen();
 
     ensureCanWrite(name);
-    return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
-  }
-
-  /** Sets the maximum (approx) MB/sec allowed by all write
-   *  IO performed by merging.  Pass null to have no limit.
-   *
-   *  <p><b>NOTE</b>: if merges are already running there is
-   *  no guarantee this new rate will apply to them; it will
-   *  only apply for certain to new merges.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    if (mbPerSec == null) {
-      if (limiter != null) {
-        limiter.setMbPerSec(Double.MAX_VALUE);
-        mergeWriteRateLimiter = null;
-      }
-    } else if (limiter != null) {
-      limiter.setMbPerSec(mbPerSec);
-    } else {
-      mergeWriteRateLimiter = new RateLimiter(mbPerSec);
-    }
-  }
-
-  /**
-   * Sets the rate limiter to be used to limit (approx) MB/sec allowed
-   * by all IO performed when merging. Pass null to have no limit.
-   *
-   * <p>Passing an instance of rate limiter compared to setting it using
-   * {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
-   * instance across several directories globally limiting IO when merging
-   * across them.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
-    this.mergeWriteRateLimiter = mergeWriteRateLimiter;
-  }
-
-  /** See {@link #setMaxMergeWriteMBPerSec}.
-   *
-   * @lucene.experimental */
-  public Double getMaxMergeWriteMBPerSec() {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    return limiter == null ? null : limiter.getMbPerSec();
+    return new FSIndexOutput(this, name, getRateLimiter(context.context));
   }
 
   protected void ensureCanWrite(String name) throws IOException {
diff --git a/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java b/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
index fc685f3..f44647a 100644
--- a/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
+++ b/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
@@ -19,75 +19,102 @@ package org.apache.lucene.store;
 
 import org.apache.lucene.util.ThreadInterruptedException;
 
-/** Simple class to rate limit IO.  Typically it's shared
- *  across multiple IndexInputs or IndexOutputs (for example
+/** Abstract base class to rate limit IO.  Typically implementations are
+ *  shared across multiple IndexInputs or IndexOutputs (for example
  *  those involved all merging).  Those IndexInputs and
  *  IndexOutputs would call {@link #pause} whenever they
  *  want to read bytes or write bytes. */
-
-public class RateLimiter {
-  private volatile double mbPerSec;
-  private volatile double nsPerByte;
-  private volatile long lastNS;
-
-  // TODO: we could also allow eg a sub class to dynamically
-  // determine the allowed rate, eg if an app wants to
-  // change the allowed rate over time or something
-
-  /** mbPerSec is the MB/sec max IO rate */
-  public RateLimiter(double mbPerSec) {
-    setMbPerSec(mbPerSec);
-  }
+public abstract class RateLimiter {
 
   /**
    * Sets an updated mb per second rate limit.
    */
-  public void setMbPerSec(double mbPerSec) {
-    this.mbPerSec = mbPerSec;
-    nsPerByte = 1000000000. / (1024*1024*mbPerSec);
-  }
-
+  public abstract void setMbPerSec(double mbPerSec);
   /**
    * The current mb per second rate limit.
    */
-  public double getMbPerSec() {
-    return this.mbPerSec;
-  }
-
+  public abstract double getMbPerSec();
+  
   /** Pauses, if necessary, to keep the instantaneous IO
-   *  rate at or below the target. NOTE: multiple threads
-   *  may safely use this, however the implementation is
-   *  not perfectly thread safe but likely in practice this
-   *  is harmless (just means in some rare cases the rate
-   *  might exceed the target).  It's best to call this
-   *  with a biggish count, not one byte at a time. */
-  public void pause(long bytes) {
-    if (bytes == 1) {
-      return;
+   *  rate at or below the target. 
+   *  <p>
+   *  Note: the implementation is thread-safe
+   *  </p>
+   *  @return the pause time in nano seconds 
+   * */
+  public abstract long pause(long bytes);
+  
+  /**
+   * Simple class to rate limit IO.
+   */
+  public static class SimpleRateLimiter extends RateLimiter {
+    private volatile double mbPerSec;
+    private volatile double nsPerByte;
+    private volatile long lastNS;
+
+    // TODO: we could also allow eg a sub class to dynamically
+    // determine the allowed rate, eg if an app wants to
+    // change the allowed rate over time or something
+
+    /** mbPerSec is the MB/sec max IO rate */
+    public SimpleRateLimiter(double mbPerSec) {
+      setMbPerSec(mbPerSec);
+    }
+
+    /**
+     * Sets an updated mb per second rate limit.
+     */
+    public void setMbPerSec(double mbPerSec) {
+      this.mbPerSec = mbPerSec;
+      nsPerByte = 1000000000. / (1024*1024*mbPerSec);
+      
     }
 
-    // TODO: this is purely instantaneous rate; maybe we
-    // should also offer decayed recent history one?
-    final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
-    long curNS = System.nanoTime();
-    if (lastNS < curNS) {
-      lastNS = curNS;
+    /**
+     * The current mb per second rate limit.
+     */
+    public double getMbPerSec() {
+      return this.mbPerSec;
     }
+    
+    /** Pauses, if necessary, to keep the instantaneous IO
+     *  rate at or below the target. NOTE: multiple threads
+     *  may safely use this, however the implementation is
+     *  not perfectly thread safe but likely in practice this
+     *  is harmless (just means in some rare cases the rate
+     *  might exceed the target).  It's best to call this
+     *  with a biggish count, not one byte at a time.
+     *  @return the pause time in nano seconds 
+     * */
+    public long pause(long bytes) {
+      if (bytes == 1) {
+        return 0;
+      }
+
+      // TODO: this is purely instantaneous rate; maybe we
+      // should also offer decayed recent history one?
+      final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
+      long curNS = System.nanoTime();
+      if (lastNS < curNS) {
+        lastNS = curNS;
+      }
 
-    // While loop because Thread.sleep doesn't always sleep
-    // enough:
-    while(true) {
-      final long pauseNS = targetNS - curNS;
-      if (pauseNS > 0) {
-        try {
-          Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
+      // While loop because Thread.sleep doesn't always sleep
+      // enough:
+      while(true) {
+        final long pauseNS = targetNS - curNS;
+        if (pauseNS > 0) {
+          try {
+            Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+          curNS = System.nanoTime();
+          continue;
         }
-        curNS = System.nanoTime();
-        continue;
+        break;
       }
-      break;
+      return targetNS;
     }
   }
 }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
index 719a73f..2c5c4bd 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
@@ -40,6 +40,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.NoDeletionPolicy;
 import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.IOContext.Context;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.ThrottledIndexOutput;
 import org.apache.lucene.util._TestUtil;
@@ -96,8 +97,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
   // is made to delete an open file, we enroll it here.
   private Set<String> openFilesDeleted;
 
-  final RateLimiter rateLimiter;
-
   private synchronized void init() {
     if (openFiles == null) {
       openFiles = new HashMap<String,Integer>();
@@ -128,9 +127,18 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
       if (LuceneTestCase.VERBOSE) {
         System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
       }
-      rateLimiter = new RateLimiter(maxMBPerSec);
-    } else {
-      rateLimiter = null;
+      switch (randomState.nextInt(3)) {
+        case 2:
+          setMaxWriteLimiter(new RateLimiter.SimpleRateLimiter(maxMBPerSec), Context.MERGE);
+          break;
+        case 1:
+          setMaxWriteLimiter(new RateLimiter.SimpleRateLimiter(maxMBPerSec), Context.FLUSH);
+          break;
+        default:
+          setMaxWriteLimiter(new RateLimiter.SimpleRateLimiter(maxMBPerSec), Context.MERGE);
+          setMaxWriteLimiter(new RateLimiter.SimpleRateLimiter(maxMBPerSec), Context.FLUSH);
+          break;
+      }
     }
 
     init();
@@ -449,13 +457,13 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
     }
     
     //System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
-    IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
+    IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name, getRateLimiter(context.context));
     addFileHandle(io, name, Handle.Output);
     openFilesForWrite.add(name);
     
     // throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
     if (throttling == Throttling.ALWAYS || 
-        (throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
+        (throttling == Throttling.SOMETIMES && getRateLimiter(context.context) == null && randomState.nextInt(50) == 0)) {
       if (LuceneTestCase.VERBOSE) {
         System.out.println("MockDirectoryWrapper: throttling indexOutput");
       }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
index 331927f..f168d82 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
@@ -35,12 +35,14 @@ public class MockIndexOutputWrapper extends IndexOutput {
   final String name;
   
   byte[] singleByte = new byte[1];
+  private final RateLimiter rateLimiter;
 
   /** Construct an empty output buffer. */
-  public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name) {
+  public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name, RateLimiter limiter) {
     this.dir = dir;
     this.name = name;
     this.delegate = delegate;
+    this.rateLimiter = limiter;
   }
 
   @Override
@@ -78,8 +80,8 @@ public class MockIndexOutputWrapper extends IndexOutput {
     long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
     long realUsage = 0;
 
-    if (dir.rateLimiter != null && len >= 1000) {
-      dir.rateLimiter.pause(len);
+    if (rateLimiter != null && len >= 1000) {
+      rateLimiter.pause(len);
     }
 
     // If MockRAMDir crashed since we were opened, then
