Index: lucene/src/java/org/apache/lucene/store/MMapDirectory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/MMapDirectory.java (revision 1050737) +++ lucene/src/java/org/apache/lucene/store/MMapDirectory.java (revision ) @@ -103,6 +103,14 @@ } /** + * The NIO directory uses channel stream by default. + */ + @Override + protected boolean defaultUseChannelStream() { + return true; + } + + /** * true, if this platform supports unmapping mmapped files. */ public static final boolean UNMAP_SUPPORTED; Index: lucene/src/java/org/apache/lucene/store/FSDirectory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/FSDirectory.java (revision 1052980) +++ lucene/src/java/org/apache/lucene/store/FSDirectory.java (revision ) @@ -22,14 +22,20 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.RandomAccessFile; +import java.lang.reflect.Method; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Collection; import static java.util.Collections.synchronizedSet; import java.util.HashSet; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.Constants; @@ -133,6 +139,10 @@ protected final Set staleFiles = synchronizedSet(new HashSet()); // Files written, but not yet sync'ed private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566 + private boolean useChannelOutput = true; + private boolean cacheChannelBuffers = true; + private Queue cachedBuffers = new LinkedBlockingQueue(); + // returns the canonical version of the directory, creating it if it doesn't exist. private static File getCanonicalPath(File file) throws IOException { return new File(file.getCanonicalPath()); @@ -155,8 +165,18 @@ throw new NoSuchDirectoryException("file '" + directory + "' exists but is not a directory"); setLockFactory(lockFactory); + useChannelOutput = defaultUseChannelStream(); } + /** + * Allows actual implementations of {@link FSDirectory} to control the default value + * of using channels when writing. Defaults to false, i.e., not using channel + * streams. + */ + protected boolean defaultUseChannelStream() { + return false; + } + /** Creates an FSDirectory instance, trying to pick the * best implementation given the current environment. * The directory returned uses the {@link NativeFSLockFactory}. @@ -303,12 +323,46 @@ staleFiles.remove(name); } + /** + * Set to true to set the file based {@link IndexOutput} to use file channel + * to write content to disk. Set to false to use plain RAF. Defaults to true. + * + *

Should be set before the directory is actually used. + */ + public void setUseChannelOutput(boolean useChannelOutput) { + this.useChannelOutput = useChannelOutput; + } + + /** + * Should buffers used to write content to disk when using channel based file output + * be reused or not. Defaults to true. + * + *

Should be set before the directory is actually used. + */ + public void setCacheChannelBuffers(boolean cacheChannelBuffers) { + this.cacheChannelBuffers = cacheChannelBuffers; + } + /** Creates an IndexOutput for the file with the given name. */ @Override public IndexOutput createOutput(String name) throws IOException { ensureOpen(); ensureCanWrite(name); + if (useChannelOutput) { + // we use direct buffers since FileChannel#write converts a non direct bb + // to a direct one (OpenJDK 6 code), so we might as well create one and write directly to + ByteBuffer buffer; + if (cacheChannelBuffers) { + while ((buffer = cachedBuffers.poll()) == null) { + cachedBuffers.offer(ByteBuffer.allocateDirect(FSChannelIndexOutput.BUFFER_SIZE)); + } + } else { + buffer = ByteBuffer.allocateDirect(FSChannelIndexOutput.BUFFER_SIZE); + } + buffer.clear(); + return new FSChannelIndexOutput(this, name, buffer); + } return new FSIndexOutput(this, name); } @@ -326,6 +380,15 @@ staleFiles.add(io.name); } + protected void onIndexOutputClosed(FSChannelIndexOutput io) { + staleFiles.add(io.name); + if (cacheChannelBuffers) { + cachedBuffers.offer(io.buffer); + } else { + Cleaner.clean(io.buffer); + } + } + @Override public void sync(Collection names) throws IOException { ensureOpen(); @@ -380,7 +443,11 @@ @Override public synchronized void close() { isOpen = false; + ByteBuffer buffer; + while ((buffer = cachedBuffers.poll()) != null) { + Cleaner.clean(buffer); - } + } + } /** @return the underlying filesystem directory */ public File getDirectory() { @@ -528,4 +595,168 @@ // Throw original exception throw exc; } + + protected static class FSChannelIndexOutput extends IndexOutput { + static final int BUFFER_SIZE = 16384; + private final ByteBuffer buffer; + + private final FSDirectory parent; + private final String name; + private final RandomAccessFile file; + private final FileChannel fileChannel; + private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once + private long position; + + public FSChannelIndexOutput(FSDirectory parent, String name, ByteBuffer buffer) throws IOException { + this.parent = parent; + this.name = name; + file = new RandomAccessFile(new File(parent.directory, name), "rw"); + fileChannel = file.getChannel(); + isOpen = true; + position = 0; + this.buffer = buffer; -} + } + + @Override + public void writeByte(byte b) throws IOException { + if (buffer.remaining() == 0) { + flush(); + } + buffer.put(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + while (length > 0) { + int remainInBuffer = buffer.remaining(); + if (remainInBuffer == 0) { + flush(); + remainInBuffer = buffer.remaining(); + } + + int bytesToCopy = length < remainInBuffer ? length : remainInBuffer; + buffer.put(b, offset, bytesToCopy); + offset += bytesToCopy; + length -= bytesToCopy; + } + } + + @Override + public void flush() throws IOException { + if (buffer.position() == 0) { + return; + } + buffer.flip(); + fileChannel.write(buffer, position); + position += buffer.position(); + buffer.clear(); + } + + @Override + public long getFilePointer() { + return position + buffer.position(); + } + + @Override + public void close() throws IOException { + // only close the file if it has not been closed yet + if (isOpen) { + boolean success = false; + try { + flush(); + success = true; + } finally { + isOpen = false; + if (!success) { + try { + file.close(); + parent.onIndexOutputClosed(this); + } catch (Throwable t) { + // Suppress so we don't mask original exception + } + } else + file.close(); + } + } + } + + /** + * Random-access methods + */ + @Override + public void seek(long pos) throws IOException { + flush(); + position = pos; + } + + @Override + public long length() throws IOException { + return file.length(); + } + + @Override + public void setLength(long length) throws IOException { + file.setLength(length); + } + + @Override + public void writeInt(int i) throws IOException { + try { + buffer.putInt(i); + } catch (BufferOverflowException e) { + flush(); + buffer.putInt(i); + } + } + + @Override + public void writeLong(long i) throws IOException { + try { + buffer.putLong(i); + } catch (BufferOverflowException e) { + flush(); + buffer.putLong(i); + } + } + } + + /** + * Helper class to allocator implementations allowing to clean direct buffers. + *

+ * TODO we need to make this shared by different places in Lucene to clean Direct Buffers + */ + public static class Cleaner { + public static final boolean CLEAN_SUPPORTED; + private static final Method directBufferCleaner; + private static final Method directBufferCleanerClean; + + static { + Method directBufferCleanerX = null; + Method directBufferCleanerCleanX = null; + boolean v; + try { + directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner"); + directBufferCleanerX.setAccessible(true); + directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean"); + directBufferCleanerCleanX.setAccessible(true); + v = true; + } catch (Exception e) { + v = false; + } + CLEAN_SUPPORTED = v; + directBufferCleaner = directBufferCleanerX; + directBufferCleanerClean = directBufferCleanerCleanX; + } + + public static void clean(ByteBuffer buffer) { + if (CLEAN_SUPPORTED && buffer.isDirect()) { + try { + Object cleaner = directBufferCleaner.invoke(buffer); + directBufferCleanerClean.invoke(cleaner); + } catch (Exception e) { + // silently ignore exception + } + } + } + } +} Index: lucene/src/java/org/apache/lucene/store/NIOFSDirectory.java =================================================================== --- lucene/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision 990822) +++ lucene/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision ) @@ -78,6 +78,14 @@ return new NIOFSIndexInput(new File(getDirectory(), name), bufferSize, getReadChunkSize()); } + /** + * The NIO directory uses channel stream by default. + */ + @Override + protected boolean defaultUseChannelStream() { + return true; + } + protected static class NIOFSIndexInput extends SimpleFSDirectory.SimpleFSIndexInput { private ByteBuffer byteBuf; // wraps the buffer for NIO