Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 1139215) +++ lucene/CHANGES.txt (working copy) @@ -438,6 +438,10 @@ IndexSearcher. SortFields can have SortField.REWRITEABLE type which requires they are rewritten before they are used. (Chris Male) +* LUCENE-3203: FSDirectory can now limit the max allowed write rate + (MB/sec) of all running merges, to reduce impact ongoing merging has + on searching, NRT reopen time, etc. (Mike McCandless) + Optimizations * LUCENE-2588: Don't store unnecessary suffixes when writing the terms Index: lucene/src/java/org/apache/lucene/store/RateLimiter.java =================================================================== --- lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0) +++ lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0) @@ -0,0 +1,77 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.util.ThreadInterruptedException; + +/** Simple class to rate limit IO. Typically it's shared + * across multiple IndexInputs or IndexOutputs (for example + * those involved all merging). Those IndexInputs and + * IndexOutputs would call {@link #pause} whenever they + * want to read bytes or write bytes. */ + +public class RateLimiter { + private volatile double nsPerByte; + private volatile long lastNS; + + // TODO: we could also allow eg a sub class to dynamically + // determine the allowed rate, eg if an app wants to + // change the allowed rate over time or something + + /** mbPerSec is the MB/sec max IO rate */ + public RateLimiter(double mbPerSec) { + setMaxRate(mbPerSec); + } + + public void setMaxRate(double mbPerSec) { + nsPerByte = 1000000000. / (1024*1024*mbPerSec); + } + + /** Pauses, if necessary, to keep the instantaneous IO + * rate at or below the target. NOTE: multiple threads + * may safely use this, however the implementation is + * not perfectly thread safe but likely in practice this + * is harmless (just means in some rate cases the rate + * might exceed the target). */ + public void pause(long bytes) { + + // TODO: this is purely instantenous rate; maybe we + // should also offer decayed recent history one? + final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte)); + long curNS = System.nanoTime(); + if (lastNS < curNS) { + lastNS = curNS; + } + + // While loop because Thread.sleep doesn't alway sleep + // enough: + while(true) { + final long pauseNS = targetNS - curNS; + if (pauseNS > 0) { + try { + Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000)); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + curNS = System.nanoTime(); + continue; + } + break; + } + } +} Property changes on: lucene/src/java/org/apache/lucene/store/RateLimiter.java ___________________________________________________________________ Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/store/FSDirectory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/FSDirectory.java (revision 1139215) +++ lucene/src/java/org/apache/lucene/store/FSDirectory.java (working copy) @@ -123,6 +123,10 @@ protected final Set staleFiles = synchronizedSet(new HashSet()); // Files written, but not yet sync'ed private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566 + // null means no limite + private Double maxMergeWriteMBPerSec; + private RateLimiter mergeWriteRateLimiter; + // returns the canonical version of the directory, creating it if it doesn't exist. private static File getCanonicalPath(File file) throws IOException { return new File(file.getCanonicalPath()); @@ -291,9 +295,38 @@ ensureOpen(); ensureCanWrite(name); - return new FSIndexOutput(this, name); + return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null); } + /** Sets the maximum (approx) MB/sec allowed by all IO + * performed by merging. Pass null to have no limit. + * + *

NOTE: if merges are already running there is + * no guarantee this rate will apply to them; it will only + * apply for certain to new merges. + * + * @lucene.experimental */ + public synchronized void setMaxMergeWriteMBPerSec(Double mbPerSec) { + maxMergeWriteMBPerSec = mbPerSec; + if (mbPerSec == null) { + if (mergeWriteRateLimiter != null) { + mergeWriteRateLimiter.setMaxRate(Double.MAX_VALUE); + mergeWriteRateLimiter = null; + } + } else if (mergeWriteRateLimiter != null) { + mergeWriteRateLimiter.setMaxRate(mbPerSec); + } else { + mergeWriteRateLimiter = new RateLimiter(mbPerSec); + } + } + + /** See {@link #setMaxMergeWriteMBPerSec}. + * + * @lucene.experimental */ + public Double getMaxMergeWriteMBPerSec() { + return maxMergeWriteMBPerSec; + } + protected void ensureCanWrite(String name) throws IOException { if (!directory.exists()) if (!directory.mkdirs()) @@ -403,17 +436,22 @@ private final String name; private final RandomAccessFile file; private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once - - public FSIndexOutput(FSDirectory parent, String name) throws IOException { + private final RateLimiter rateLimiter; + + public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException { this.parent = parent; this.name = name; file = new RandomAccessFile(new File(parent.directory, name), "rw"); isOpen = true; + this.rateLimiter = rateLimiter; } /** output methods: */ @Override public void flushBuffer(byte[] b, int offset, int size) throws IOException { + if (rateLimiter != null) { + rateLimiter.pause(size); + } file.write(b, offset, size); }