Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java (revision 1135550) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java (working copy) @@ -18,6 +18,7 @@ */ import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -83,22 +84,31 @@ private final RAMDirectory cache = new RAMDirectory(); + private final RateLimiter mergeWriteLimiter; + private final Directory delegate; private final long maxMergeSizeBytes; private final long maxCachedBytes; - private static final boolean VERBOSE = false; + // nocommit -- make this controllable? + private static final boolean VERBOSE = true; /** * We will cache a newly created output if 1) it's a * flush or a merge and the estimated size of the merged segment is <= * maxMergeSizeMB, and 2) the total cached bytes is <= * maxCachedMB */ - public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) { + public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB, double maxWriteMBPerSec) { this.delegate = delegate; maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024); maxCachedBytes = (long) (maxCachedMB*1024*1024); + + if (maxWriteMBPerSec > 0.0) { + mergeWriteLimiter = new RateLimiter(maxWriteMBPerSec); + } else { + mergeWriteLimiter = null; + } } @Override @@ -202,6 +212,8 @@ System.out.println(" to cache"); } return cache.createOutput(name); + } else if (mergeWriteLimiter != null && doRateLimitWrite(name)) { + return delegate.createOutput(name, mergeWriteLimiter); } else { return delegate.createOutput(name); } @@ -273,10 +285,19 @@ * true if this file should be written to the RAMDirectory. */ protected boolean doCacheWrite(String name) { final MergePolicy.OneMerge merge = merges.get(Thread.currentThread()); + if (cache.sizeInBytes() > maxCachedBytes) { + System.out.println("CACHE: too big " + cache.sizeInBytes() + " vs " + maxCachedBytes + ": " + Arrays.toString(cache.listAll())); + } //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes)); return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes; } + protected boolean doRateLimitWrite(String name) { + final MergePolicy.OneMerge merge = merges.get(Thread.currentThread()); + //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes)); + return merge != null; + } + private void unCache(String fileName) throws IOException { final IndexOutput out; synchronized(this) { Index: lucene/src/java/org/apache/lucene/store/RateLimiter.java =================================================================== --- lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0) +++ lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0) @@ -0,0 +1,62 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.util.ThreadInterruptedException; + +// TODO +// - allowed rate could be dynamic according to external +// criteria? + +public class RateLimiter { + private final double nsPerByte; + private volatile long lastNS; + + public RateLimiter(double mbPerSec) { + nsPerByte = 1000000000. / (1024*1024*mbPerSec); + System.out.println("RL: nsPerByte=" + nsPerByte); + } + + public void pause(long bytes) { + // nocommit -- not perfectly thread safe: + // nocommit -- this is purely instantaneous; may need + // some decayed history: + final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte)); + + // While loop because Thread.sleep doesn't alway sleep + // enough: + long curNS = System.nanoTime(); + if (lastNS < curNS) { + lastNS = curNS; + } + while(true) { + final long pauseNS = targetNS - curNS; + if (pauseNS > 0) { + //System.out.println("p " + bytes + " " + pauseNS); + try { + Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000)); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + curNS = System.nanoTime(); + continue; + } + break; + } + } +} Property changes on: lucene/src/java/org/apache/lucene/store/RateLimiter.java ___________________________________________________________________ Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/store/Directory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/Directory.java (revision 1135550) +++ lucene/src/java/org/apache/lucene/store/Directory.java (working copy) @@ -90,6 +90,12 @@ public abstract IndexOutput createOutput(String name) throws IOException; + // nocommit + public IndexOutput createOutput(String name, RateLimiter rateLimiter) + throws IOException { + return createOutput(name); + } + /** * Ensure that any writes to these files are moved to * stable storage. Lucene uses this to properly commit Index: lucene/src/java/org/apache/lucene/store/FSDirectory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/FSDirectory.java (revision 1135550) +++ lucene/src/java/org/apache/lucene/store/FSDirectory.java (working copy) @@ -235,6 +235,7 @@ * @see #listAll(File) */ @Override public String[] listAll() throws IOException { + System.out.println(Thread.currentThread().getName() + ": listAll"); ensureOpen(); return listAll(directory); } @@ -242,6 +243,7 @@ /** Returns true iff a file with the given name exists. */ @Override public boolean fileExists(String name) { + System.out.println(Thread.currentThread().getName() + ": exists? " + name); ensureOpen(); File file = new File(directory, name); return file.exists(); @@ -265,6 +267,7 @@ @Override public long fileLength(String name) throws IOException { ensureOpen(); + System.out.println(Thread.currentThread().getName() + ": fileLength " + name); File file = new File(directory, name); final long len = file.length(); if (len == 0 && !file.exists()) { @@ -278,6 +281,7 @@ @Override public void deleteFile(String name) throws IOException { ensureOpen(); + System.out.println(Thread.currentThread().getName() + ": deleteFile " + name); File file = new File(directory, name); if (!file.delete()) throw new IOException("Cannot delete " + file); @@ -287,10 +291,17 @@ /** Creates an IndexOutput for the file with the given name. */ @Override public IndexOutput createOutput(String name) throws IOException { + return createOutput(name, null); + } + + // nocommit jdoc + @Override + public IndexOutput createOutput(String name, RateLimiter rateLimiter) throws IOException { ensureOpen(); + System.out.println(Thread.currentThread().getName() + ": createOutput " + name); ensureCanWrite(name); - return new FSIndexOutput(this, name); + return new FSIndexOutput(this, name, rateLimiter); } protected void ensureCanWrite(String name) throws IOException { @@ -323,6 +334,7 @@ @Override public IndexInput openInput(String name) throws IOException { ensureOpen(); + System.out.println(Thread.currentThread().getName() + ": openInput " + name); return openInput(name, BufferedIndexInput.BUFFER_SIZE); } @@ -359,6 +371,7 @@ /** For debug output. */ @Override public String toString() { + System.out.println(Thread.currentThread().getName() + ": toString"); return this.getClass().getName() + "@" + directory + " lockFactory=" + getLockFactory(); } @@ -408,11 +421,13 @@ private final FSDirectory parent; private final String name; private final RandomAccessFile file; + private final RateLimiter rateLimiter; private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once - public FSIndexOutput(FSDirectory parent, String name) throws IOException { + public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException { this.parent = parent; this.name = name; + this.rateLimiter = rateLimiter; file = new RandomAccessFile(new File(parent.directory, name), "rw"); isOpen = true; } @@ -420,6 +435,9 @@ /** output methods: */ @Override public void flushBuffer(byte[] b, int offset, int size) throws IOException { + if (rateLimiter != null) { + rateLimiter.pause(size); + } file.write(b, offset, size); }