Index: lucene/core/src/java/org/apache/lucene/store/RAMDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/RAMDirectory.java (revision 1305306) +++ lucene/core/src/java/org/apache/lucene/store/RAMDirectory.java (working copy) @@ -27,16 +27,24 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; + /** * A memory-resident {@link Directory} implementation. Locking * implementation is by default the {@link SingleInstanceLockFactory} * but can be changed with {@link #setLockFactory}. */ public class RAMDirectory extends Directory { + public static final int DEFAULT_BUFFER_SIZE = 8192; + protected final Map fileMap = new ConcurrentHashMap(); protected final AtomicLong sizeInBytes = new AtomicLong(); + protected int defaultBufferSize = DEFAULT_BUFFER_SIZE; + // ***** // Lock acquisition sequence: RAMDirectory, then RAMFile // ***** @@ -73,13 +81,46 @@ private RAMDirectory(Directory dir, boolean closeDir, IOContext context) throws IOException { this(); for (String file : dir.listAll()) { - dir.copy(this, file, file, context); + IndexOutput os = null; + IndexInput is = null; + IOException priorException = null; + try { + is = dir.openInput(file, context); + // prevent 0 byte bufferSize for empty files, or too large bufferSize for huge files: + final int bufferSize = Math.max(1, (int) Math.min(is.length(), 1L << 30)); + RAMFile rf = newRAMFile(bufferSize); + fileMap.put(file, rf); + os = new RAMOutputStream(rf); + is.copyBytes(os, is.length()); + } catch (IOException ioe) { + priorException = ioe; + } finally { + IOUtils.closeWhileHandlingException(priorException, os, is); + } } if (closeDir) { dir.close(); } } + + /** Returns the default buffer size. + * The default value unless changed is {@link #DEFAULT_BUFFER_SIZE}. + */ + public final int getDefaultBufferSize() { + return defaultBufferSize; + } + /** Sets the default buffer size for all files created from now on. + * Already existing files are not modified. + *

The actual buffer size may get modified depending on the {@link IOContext} + * passed to {@link #createOutput(String, IOContext)}. + * The default value unless changed is {@link #DEFAULT_BUFFER_SIZE}. + * @see #getSuitableBufferSize(String, IOContext) + */ + public final void setDefaultBufferSize(int defaultBufferSize) { + this.defaultBufferSize = defaultBufferSize; + } + @Override public final String[] listAll() { ensureOpen(); @@ -113,7 +154,7 @@ /** * Return total size in bytes of all files in this directory. This is - * currently quantized to RAMOutputStream.BUFFER_SIZE. + * currently quantized. */ public final long sizeInBytes() { ensureOpen(); @@ -128,7 +169,6 @@ ensureOpen(); RAMFile file = fileMap.remove(name); if (file != null) { - file.directory = null; sizeInBytes.addAndGet(-file.sizeInBytes); } else { throw new FileNotFoundException(name); @@ -139,23 +179,49 @@ @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); - RAMFile file = newRAMFile(); + RAMFile file = newRAMFile(getSuitableBufferSize(name, context)); RAMFile existing = fileMap.remove(name); if (existing != null) { sizeInBytes.addAndGet(-existing.sizeInBytes); - existing.directory = null; } fileMap.put(name, file); return new RAMOutputStream(file); } + + /** + * Returns a suitable buffer size for the given {@link IOContext}. If the context + * describes a flush or merge operation, 1/64 of the estimated target segment size is returned, + * with a minimum of {@link #DEFAULT_BUFFER_SIZE}. For the standard segment metadata files + * (segment*), a buffer size of 512 is used, as those files are generally small. + * For all other IOContexts, {@link #DEFAULT_BUFFER_SIZE} is returned. + */ + protected int getSuitableBufferSize(String name, IOContext context) { + if (name.startsWith(IndexFileNames.SEGMENTS)) { + return 512; + } + switch (context.context) { + case FLUSH: + return Math.max( + (int) Math.min(context.flushInfo.estimatedSegmentSize >> 6, 1L << 30), + DEFAULT_BUFFER_SIZE + ); + case MERGE: + return Math.max( + (int) Math.min(context.mergeInfo.estimatedMergeBytes >> 6, 1L << 30), + DEFAULT_BUFFER_SIZE + ); + default: + return DEFAULT_BUFFER_SIZE; + } + } /** * Returns a new {@link RAMFile} for storing data. This method can be * overridden to return different {@link RAMFile} impls, that e.g. override * {@link RAMFile#newBuffer(int)}. */ - protected RAMFile newRAMFile() { - return new RAMFile(this); + protected RAMFile newRAMFile(int bufferSize) { + return new RAMFile(this, bufferSize); } @Override @@ -176,6 +242,16 @@ /** Closes the store to future operations, releasing associated memory. */ @Override public void close() { + // print required vs. reported file size: + System.out.println("RAMDirectory instance: " + this); + System.out.println("Instance size: " + RamUsageEstimator.humanSizeOf(this)); + long sizes = 0L; + for (Map.Entry e : fileMap.entrySet()) { + final RAMFile file = e.getValue(); + sizes += file.getLength(); + System.out.println(" - " + e.getKey() + ": " + file.getLength() + " (buffer size: " + file.bufferSize + ", allocation: " + file.getSizeInBytes() + ")"); + } + System.out.println("Sum of file sizes: " + RamUsageEstimator.humanReadableUnits(sizes)); isOpen = false; fileMap.clear(); } Index: lucene/core/src/java/org/apache/lucene/store/RAMFile.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/RAMFile.java (revision 1305306) +++ lucene/core/src/java/org/apache/lucene/store/RAMFile.java (working copy) @@ -22,15 +22,24 @@ /** @lucene.internal */ public class RAMFile { protected ArrayList buffers = new ArrayList(); + protected final int bufferSize; long length; - RAMDirectory directory; + final RAMDirectory directory; protected long sizeInBytes; // File used as buffer, in no RAMDirectory - public RAMFile() {} + public RAMFile() { + this(null, RAMDirectory.DEFAULT_BUFFER_SIZE); + } - RAMFile(RAMDirectory directory) { + // File used as buffer, in no RAMDirectory + public RAMFile(int bufferSize) { + this(null, bufferSize); + } + + RAMFile(RAMDirectory directory, int bufferSize) { this.directory = directory; + this.bufferSize = bufferSize; } // For non-stream access from thread that might be concurrent with writing Index: lucene/core/src/java/org/apache/lucene/store/RAMInputStream.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/RAMInputStream.java (revision 1305306) +++ lucene/core/src/java/org/apache/lucene/store/RAMInputStream.java (working copy) @@ -24,11 +24,10 @@ * * @lucene.internal */ public class RAMInputStream extends IndexInput implements Cloneable { - static final int BUFFER_SIZE = RAMOutputStream.BUFFER_SIZE; + private final RAMFile file; + private final int bufferSize; + private final long length; - private RAMFile file; - private long length; - private byte[] currentBuffer; private int currentBufferIndex; @@ -40,7 +39,8 @@ super("RAMInputStream(name=" + name + ")"); file = f; length = file.length; - if (length/BUFFER_SIZE >= Integer.MAX_VALUE) { + bufferSize = file.bufferSize; + if (length/bufferSize >= Integer.MAX_VALUE) { throw new IOException("RAMInputStream too large length=" + length + ": " + name); } @@ -50,6 +50,10 @@ currentBuffer = null; } + public final int getBufferSize() { + return bufferSize; + } + @Override public void close() { // nothing to do here @@ -87,7 +91,7 @@ } private final void switchCurrentBuffer(boolean enforceEOF) throws IOException { - bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex; + bufferStart = (long) bufferSize * (long) currentBufferIndex; if (currentBufferIndex >= file.numBuffers()) { // end of file reached, no more buffers left if (enforceEOF) { @@ -95,13 +99,13 @@ } else { // Force EOF if a read takes place at this position currentBufferIndex--; - bufferPosition = BUFFER_SIZE; + bufferPosition = bufferSize; } } else { currentBuffer = file.getBuffer(currentBufferIndex); bufferPosition = 0; long buflen = length - bufferStart; - bufferLength = buflen > BUFFER_SIZE ? BUFFER_SIZE : (int) buflen; + bufferLength = buflen > bufferSize ? bufferSize : (int) buflen; } } @@ -133,10 +137,10 @@ @Override public void seek(long pos) throws IOException { - if (currentBuffer==null || pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) { - currentBufferIndex = (int) (pos / BUFFER_SIZE); + if (currentBuffer==null || pos < bufferStart || pos >= bufferStart + bufferSize) { + currentBufferIndex = (int) (pos / bufferSize); switchCurrentBuffer(false); } - bufferPosition = (int) (pos % BUFFER_SIZE); + bufferPosition = (int) (pos % bufferSize); } } Index: lucene/core/src/java/org/apache/lucene/store/RAMOutputStream.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/RAMOutputStream.java (revision 1305306) +++ lucene/core/src/java/org/apache/lucene/store/RAMOutputStream.java (working copy) @@ -25,10 +25,9 @@ * @lucene.internal */ public class RAMOutputStream extends IndexOutput { - static final int BUFFER_SIZE = 1024; + private final RAMFile file; + private final int bufferSize; - private RAMFile file; - private byte[] currentBuffer; private int currentBufferIndex; @@ -43,6 +42,7 @@ public RAMOutputStream(RAMFile f) { file = f; + bufferSize = file.bufferSize; // make sure that we switch to the // first needed buffer lazily @@ -50,6 +50,10 @@ currentBuffer = null; } + public final int getBufferSize() { + return bufferSize; + } + /** Copy the current contents of this buffer to the named output. */ public void writeTo(IndexOutput out) throws IOException { flush(); @@ -57,7 +61,7 @@ long pos = 0; int buffer = 0; while (pos < end) { - int length = BUFFER_SIZE; + int length = bufferSize; long nextPos = pos + length; if (nextPos > end) { // at the last buffer length = (int)(end - pos); @@ -76,7 +80,7 @@ int buffer = 0; int bytesUpto = offset; while (pos < end) { - int length = BUFFER_SIZE; + int length = bufferSize; long nextPos = pos + length; if (nextPos > end) { // at the last buffer length = (int)(end - pos); @@ -108,11 +112,11 @@ // and flush() has not been called yet setFileLength(); if (pos < bufferStart || pos >= bufferStart + bufferLength) { - currentBufferIndex = (int) (pos / BUFFER_SIZE); + currentBufferIndex = (int) (pos / bufferSize); switchCurrentBuffer(); } - bufferPosition = (int) (pos % BUFFER_SIZE); + bufferPosition = (int) (pos % bufferSize); } @Override @@ -149,12 +153,12 @@ private final void switchCurrentBuffer() throws IOException { if (currentBufferIndex == file.numBuffers()) { - currentBuffer = file.addBuffer(BUFFER_SIZE); + currentBuffer = file.addBuffer(bufferSize); } else { currentBuffer = file.getBuffer(currentBufferIndex); } bufferPosition = 0; - bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex; + bufferStart = (long) bufferSize * (long) currentBufferIndex; bufferLength = currentBuffer.length; } @@ -177,7 +181,7 @@ /** Returns byte usage of all buffers. */ public long sizeInBytes() { - return file.numBuffers() * BUFFER_SIZE; + return file.numBuffers() * bufferSize; } @Override Index: lucene/core/src/test/org/apache/lucene/store/TestHugeRamFile.java =================================================================== --- lucene/core/src/test/org/apache/lucene/store/TestHugeRamFile.java (revision 1305306) +++ lucene/core/src/test/org/apache/lucene/store/TestHugeRamFile.java (working copy) @@ -55,8 +55,8 @@ DenseRAMFile f = new DenseRAMFile(); // output part RAMOutputStream out = new RAMOutputStream(f); - byte b1[] = new byte[RAMOutputStream.BUFFER_SIZE]; - byte b2[] = new byte[RAMOutputStream.BUFFER_SIZE / 3]; + byte b1[] = new byte[out.getBufferSize()]; + byte b2[] = new byte[out.getBufferSize() / 3]; for (int i = 0; i < b1.length; i++) { b1[i] = (byte) (i & 0x0007F); } Index: lucene/core/src/test/org/apache/lucene/store/TestRAMDirectory.java =================================================================== --- lucene/core/src/test/org/apache/lucene/store/TestRAMDirectory.java (revision 1305306) +++ lucene/core/src/test/org/apache/lucene/store/TestRAMDirectory.java (working copy) @@ -170,16 +170,18 @@ public void testSeekToEOFThenBack() throws Exception { RAMDirectory dir = new RAMDirectory(); - IndexOutput o = dir.createOutput("out", newIOContext(random)); - byte[] bytes = new byte[3*RAMInputStream.BUFFER_SIZE]; + final IOContext ctx = newIOContext(random); + IndexOutput o = dir.createOutput("out", ctx); + final int bufferSize = dir.getSuitableBufferSize("out", ctx); + byte[] bytes = new byte[3*bufferSize]; o.writeBytes(bytes, 0, bytes.length); o.close(); IndexInput i = dir.openInput("out", newIOContext(random)); - i.seek(2*RAMInputStream.BUFFER_SIZE-1); - i.seek(3*RAMInputStream.BUFFER_SIZE); - i.seek(RAMInputStream.BUFFER_SIZE); - i.readBytes(bytes, 0, 2*RAMInputStream.BUFFER_SIZE); + i.seek(2*bufferSize-1); + i.seek(3*bufferSize); + i.seek(bufferSize); + i.readBytes(bytes, 0, 2*bufferSize); i.close(); dir.close(); } Index: lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (revision 1305306) +++ lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (working copy) @@ -431,7 +431,7 @@ if (delegate instanceof RAMDirectory) { RAMDirectory ramdir = (RAMDirectory) delegate; - RAMFile file = new RAMFile(ramdir); + RAMFile file = ramdir.newRAMFile(ramdir.getSuitableBufferSize(name, context)); RAMFile existing = ramdir.fileMap.get(name); // Enforce write once: @@ -440,7 +440,6 @@ else { if (existing!=null) { ramdir.sizeInBytes.getAndAdd(-existing.sizeInBytes); - existing.directory = null; } ramdir.fileMap.put(name, file); }