diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 41a6378..954ea09 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -85,7 +85,7 @@ API Changes
 
 * LUCENE-4520: ValueSource.getSortField no longer throws IOExceptions
   (Alan Woodward)
-
+  
 Bug Fixes
 
 * LUCENE-1822: BaseFragListBuilder hard-coded 6 char margin is too naive.
diff --git a/lucene/core/src/java/org/apache/lucene/store/Directory.java b/lucene/core/src/java/org/apache/lucene/store/Directory.java
index 4be172e..4959437 100644
--- a/lucene/core/src/java/org/apache/lucene/store/Directory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/Directory.java
@@ -86,6 +86,23 @@ public abstract class Directory implements Closeable {
       Returns a stream writing this file. */
   public abstract IndexOutput createOutput(String name, IOContext context)
        throws IOException;
+  
+  /**
+   * Creates a new, empty file in the directory with the given name. Returns a
+   * stream writing this file.
+   * <p>This method supports an additional {@link RateLimiter} that limits the 
+   * write operations on the created {@link IndexOutput}. <b>Note:</b> {@link RateLimiter}
+   * is currently only respected by {@link FSDirectory} and its subclasses. 
+   * By default this method delegates to {@link Directory#createOutput(String, IOContext)}
+   * </p>
+   * <p>Applications which need rate limiting should wrap the actual directory and pass in
+   * a {@link RateLimiter} instance based on the {@link IOContext} the {@link IndexOutput output} is 
+   * created for.</p>
+   */
+  protected IndexOutput createOutput(String name, IOContext context, RateLimiter rateLimiter) throws IOException {
+    //nocommit - check delegate directories
+    return createOutput(name, context);
+  }
 
   /**
    * Ensure that any writes to these files are moved to
@@ -250,7 +267,7 @@ public abstract class Directory implements Closeable {
     if (!isOpen)
       throw new AlreadyClosedException("this Directory is closed");
   }
-  
+
   /**
    * Allows to create one or more sliced {@link IndexInput} instances from a single 
    * file handle. Some {@link Directory} implementations may be able to efficiently map slices of a file
@@ -326,4 +343,5 @@ public abstract class Directory implements Closeable {
       return length;
     }
   }
+  
 }
diff --git a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
index 770f403..48db985 100644
--- a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
@@ -123,9 +123,6 @@ public abstract class FSDirectory extends Directory {
   protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
   private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
 
-  // null means no limit
-  private volatile RateLimiter mergeWriteRateLimiter;
-
   // returns the canonical version of the directory, creating it if it doesn't exist.
   private static File getCanonicalPath(File file) throws IOException {
     return new File(file.getCanonicalPath());
@@ -283,54 +280,22 @@ public abstract class FSDirectory extends Directory {
   /** Creates an IndexOutput for the file with the given name. */
   @Override
   public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    ensureOpen();
-
-    ensureCanWrite(name);
-    return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
+    return createOutput(name, context, null);
   }
 
-  /** Sets the maximum (approx) MB/sec allowed by all write
-   *  IO performed by merging.  Pass null to have no limit.
-   *
-   *  <p><b>NOTE</b>: if merges are already running there is
-   *  no guarantee this new rate will apply to them; it will
-   *  only apply for certain to new merges.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    if (mbPerSec == null) {
-      if (limiter != null) {
-        limiter.setMbPerSec(Double.MAX_VALUE);
-        mergeWriteRateLimiter = null;
-      }
-    } else if (limiter != null) {
-      limiter.setMbPerSec(mbPerSec);
-    } else {
-      mergeWriteRateLimiter = new RateLimiter(mbPerSec);
-    }
-  }
+  @Override
+  protected IndexOutput createOutput(String name, IOContext context,
+      RateLimiter rateLimiter) throws IOException {
+    ensureOpen();
 
-  /**
-   * Sets the rate limiter to be used to limit (approx) MB/sec allowed
-   * by all IO performed when merging. Pass null to have no limit.
-   *
-   * <p>Passing an instance of rate limiter compared to setting it using
-   * {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
-   * instance across several directories globally limiting IO when merging
-   * across them.
-   *
-   * @lucene.experimental */
-  public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
-    this.mergeWriteRateLimiter = mergeWriteRateLimiter;
+    ensureCanWrite(name);
+    return new FSIndexOutput(this, name, rateLimiter);
   }
 
-  /** See {@link #setMaxMergeWriteMBPerSec}.
-   *
-   * @lucene.experimental */
-  public Double getMaxMergeWriteMBPerSec() {
-    RateLimiter limiter = mergeWriteRateLimiter;
-    return limiter == null ? null : limiter.getMbPerSec();
+  @Override
+  public IndexInput openInput(String name, IOContext context)
+      throws IOException {
+    return null;
   }
 
   protected void ensureCanWrite(String name) throws IOException {
diff --git a/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java b/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
index fc685f3..f44647a 100644
--- a/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
+++ b/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
@@ -19,75 +19,102 @@ package org.apache.lucene.store;
 
 import org.apache.lucene.util.ThreadInterruptedException;
 
-/** Simple class to rate limit IO.  Typically it's shared
- *  across multiple IndexInputs or IndexOutputs (for example
+/** Abstract base class to rate limit IO.  Typically implementations are
+ *  shared across multiple IndexInputs or IndexOutputs (for example
  *  those involved all merging).  Those IndexInputs and
  *  IndexOutputs would call {@link #pause} whenever they
  *  want to read bytes or write bytes. */
-
-public class RateLimiter {
-  private volatile double mbPerSec;
-  private volatile double nsPerByte;
-  private volatile long lastNS;
-
-  // TODO: we could also allow eg a sub class to dynamically
-  // determine the allowed rate, eg if an app wants to
-  // change the allowed rate over time or something
-
-  /** mbPerSec is the MB/sec max IO rate */
-  public RateLimiter(double mbPerSec) {
-    setMbPerSec(mbPerSec);
-  }
+public abstract class RateLimiter {
 
   /**
    * Sets an updated mb per second rate limit.
    */
-  public void setMbPerSec(double mbPerSec) {
-    this.mbPerSec = mbPerSec;
-    nsPerByte = 1000000000. / (1024*1024*mbPerSec);
-  }
-
+  public abstract void setMbPerSec(double mbPerSec);
   /**
    * The current mb per second rate limit.
    */
-  public double getMbPerSec() {
-    return this.mbPerSec;
-  }
-
+  public abstract double getMbPerSec();
+  
   /** Pauses, if necessary, to keep the instantaneous IO
-   *  rate at or below the target. NOTE: multiple threads
-   *  may safely use this, however the implementation is
-   *  not perfectly thread safe but likely in practice this
-   *  is harmless (just means in some rare cases the rate
-   *  might exceed the target).  It's best to call this
-   *  with a biggish count, not one byte at a time. */
-  public void pause(long bytes) {
-    if (bytes == 1) {
-      return;
+   *  rate at or below the target. 
+   *  <p>
+   *  Note: the implementation is thread-safe
+   *  </p>
+   *  @return the pause time in nano seconds 
+   * */
+  public abstract long pause(long bytes);
+  
+  /**
+   * Simple class to rate limit IO.
+   */
+  public static class SimpleRateLimiter extends RateLimiter {
+    private volatile double mbPerSec;
+    private volatile double nsPerByte;
+    private volatile long lastNS;
+
+    // TODO: we could also allow eg a sub class to dynamically
+    // determine the allowed rate, eg if an app wants to
+    // change the allowed rate over time or something
+
+    /** mbPerSec is the MB/sec max IO rate */
+    public SimpleRateLimiter(double mbPerSec) {
+      setMbPerSec(mbPerSec);
+    }
+
+    /**
+     * Sets an updated mb per second rate limit.
+     */
+    public void setMbPerSec(double mbPerSec) {
+      this.mbPerSec = mbPerSec;
+      nsPerByte = 1000000000. / (1024*1024*mbPerSec);
+      
     }
 
-    // TODO: this is purely instantaneous rate; maybe we
-    // should also offer decayed recent history one?
-    final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
-    long curNS = System.nanoTime();
-    if (lastNS < curNS) {
-      lastNS = curNS;
+    /**
+     * The current mb per second rate limit.
+     */
+    public double getMbPerSec() {
+      return this.mbPerSec;
     }
+    
+    /** Pauses, if necessary, to keep the instantaneous IO
+     *  rate at or below the target. NOTE: multiple threads
+     *  may safely use this, however the implementation is
+     *  not perfectly thread safe but likely in practice this
+     *  is harmless (just means in some rare cases the rate
+     *  might exceed the target).  It's best to call this
+     *  with a biggish count, not one byte at a time.
+     *  @return the pause time in nano seconds 
+     * */
+    public long pause(long bytes) {
+      if (bytes == 1) {
+        return 0;
+      }
+
+      // TODO: this is purely instantaneous rate; maybe we
+      // should also offer decayed recent history one?
+      final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
+      long curNS = System.nanoTime();
+      if (lastNS < curNS) {
+        lastNS = curNS;
+      }
 
-    // While loop because Thread.sleep doesn't always sleep
-    // enough:
-    while(true) {
-      final long pauseNS = targetNS - curNS;
-      if (pauseNS > 0) {
-        try {
-          Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
+      // While loop because Thread.sleep doesn't always sleep
+      // enough:
+      while(true) {
+        final long pauseNS = targetNS - curNS;
+        if (pauseNS > 0) {
+          try {
+            Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+          curNS = System.nanoTime();
+          continue;
         }
-        curNS = System.nanoTime();
-        continue;
+        break;
       }
-      break;
+      return targetNS;
     }
   }
 }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
index 719a73f..3d2f64f 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
@@ -40,6 +40,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.NoDeletionPolicy;
 import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.IOContext.Context;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.ThrottledIndexOutput;
 import org.apache.lucene.util._TestUtil;
@@ -79,6 +80,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
   volatile boolean crashed;
   private ThrottledIndexOutput throttledOutput;
   private Throttling throttling = Throttling.SOMETIMES;
+  private RateLimiter limiter;
 
   final AtomicInteger inputCloneCount = new AtomicInteger();
 
@@ -96,8 +98,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
   // is made to delete an open file, we enroll it here.
   private Set<String> openFilesDeleted;
 
-  final RateLimiter rateLimiter;
-
   private synchronized void init() {
     if (openFiles == null) {
       openFiles = new HashMap<String,Integer>();
@@ -128,9 +128,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
       if (LuceneTestCase.VERBOSE) {
         System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
       }
-      rateLimiter = new RateLimiter(maxMBPerSec);
-    } else {
-      rateLimiter = null;
+      this.limiter = new RateLimiter.SimpleRateLimiter(maxMBPerSec);
     }
 
     init();
@@ -447,15 +445,15 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
         ramdir.fileMap.put(name, file);
       }
     }
-    
+    boolean isFSDirectory = delegate instanceof FSDirectory;
     //System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
-    IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
+    IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context), isFSDirectory ? limiter : null), name, isFSDirectory ? null : limiter);
     addFileHandle(io, name, Handle.Output);
     openFilesForWrite.add(name);
     
     // throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
     if (throttling == Throttling.ALWAYS || 
-        (throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
+        (throttling == Throttling.SOMETIMES && limiter == null && randomState.nextInt(50) == 0)) {
       if (LuceneTestCase.VERBOSE) {
         System.out.println("MockDirectoryWrapper: throttling indexOutput");
       }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
index 331927f..f168d82 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java
@@ -35,12 +35,14 @@ public class MockIndexOutputWrapper extends IndexOutput {
   final String name;
   
   byte[] singleByte = new byte[1];
+  private final RateLimiter rateLimiter;
 
   /** Construct an empty output buffer. */
-  public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name) {
+  public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name, RateLimiter limiter) {
     this.dir = dir;
     this.name = name;
     this.delegate = delegate;
+    this.rateLimiter = limiter;
   }
 
   @Override
@@ -78,8 +80,8 @@ public class MockIndexOutputWrapper extends IndexOutput {
     long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
     long realUsage = 0;
 
-    if (dir.rateLimiter != null && len >= 1000) {
-      dir.rateLimiter.pause(len);
+    if (rateLimiter != null && len >= 1000) {
+      rateLimiter.pause(len);
     }
 
     // If MockRAMDir crashed since we were opened, then
