();
+
// returns the canonical version of the directory, creating it if it doesn't exist.
private static File getCanonicalPath(File file) throws IOException {
return new File(file.getCanonicalPath());
@@ -155,8 +165,18 @@
throw new NoSuchDirectoryException("file '" + directory + "' exists but is not a directory");
setLockFactory(lockFactory);
+ useChannelOutput = defaultUseChannelStream();
}
+ /**
+ * Allows actual implementations of {@link FSDirectory} to control the default value
+ * of using channels when writing. Defaults to false, i.e., not using channel
+ * streams.
+ */
+ protected boolean defaultUseChannelStream() {
+ return false;
+ }
+
/** Creates an FSDirectory instance, trying to pick the
* best implementation given the current environment.
* The directory returned uses the {@link NativeFSLockFactory}.
@@ -303,12 +323,46 @@
staleFiles.remove(name);
}
+ /**
+ * Set to true to set the file based {@link IndexOutput} to use file channel
+ * to write content to disk. Set to false to use plain RAF. Defaults to true.
+ *
+ * Should be set before the directory is actually used.
+ */
+ public void setUseChannelOutput(boolean useChannelOutput) {
+ this.useChannelOutput = useChannelOutput;
+ }
+
+ /**
+ * Should buffers used to write content to disk when using channel based file output
+ * be reused or not. Defaults to true.
+ *
+ *
Should be set before the directory is actually used.
+ */
+ public void setCacheChannelBuffers(boolean cacheChannelBuffers) {
+ this.cacheChannelBuffers = cacheChannelBuffers;
+ }
+
/** Creates an IndexOutput for the file with the given name. */
@Override
public IndexOutput createOutput(String name) throws IOException {
ensureOpen();
ensureCanWrite(name);
+ if (useChannelOutput) {
+ // we use direct buffers since FileChannel#write converts a non direct bb
+ // to a direct one (OpenJDK 6 code), so we might as well create one and write directly to
+ ByteBuffer buffer;
+ if (cacheChannelBuffers) {
+ while ((buffer = cachedBuffers.poll()) == null) {
+ cachedBuffers.offer(ByteBuffer.allocateDirect(FSChannelIndexOutput.BUFFER_SIZE));
+ }
+ } else {
+ buffer = ByteBuffer.allocateDirect(FSChannelIndexOutput.BUFFER_SIZE);
+ }
+ buffer.clear();
+ return new FSChannelIndexOutput(this, name, buffer);
+ }
return new FSIndexOutput(this, name);
}
@@ -326,6 +380,15 @@
staleFiles.add(io.name);
}
+ protected void onIndexOutputClosed(FSChannelIndexOutput io) {
+ staleFiles.add(io.name);
+ if (cacheChannelBuffers) {
+ cachedBuffers.offer(io.buffer);
+ } else {
+ Cleaner.clean(io.buffer);
+ }
+ }
+
@Override
public void sync(Collection names) throws IOException {
ensureOpen();
@@ -380,7 +443,11 @@
@Override
public synchronized void close() {
isOpen = false;
+ ByteBuffer buffer;
+ while ((buffer = cachedBuffers.poll()) != null) {
+ Cleaner.clean(buffer);
- }
+ }
+ }
/** @return the underlying filesystem directory */
public File getDirectory() {
@@ -528,4 +595,168 @@
// Throw original exception
throw exc;
}
+
+ protected static class FSChannelIndexOutput extends IndexOutput {
+ static final int BUFFER_SIZE = 16384;
+ private final ByteBuffer buffer;
+
+ private final FSDirectory parent;
+ private final String name;
+ private final RandomAccessFile file;
+ private final FileChannel fileChannel;
+ private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
+ private long position;
+
+ public FSChannelIndexOutput(FSDirectory parent, String name, ByteBuffer buffer) throws IOException {
+ this.parent = parent;
+ this.name = name;
+ file = new RandomAccessFile(new File(parent.directory, name), "rw");
+ fileChannel = file.getChannel();
+ isOpen = true;
+ position = 0;
+ this.buffer = buffer;
-}
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ if (buffer.remaining() == 0) {
+ flush();
+ }
+ buffer.put(b);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length) throws IOException {
+ while (length > 0) {
+ int remainInBuffer = buffer.remaining();
+ if (remainInBuffer == 0) {
+ flush();
+ remainInBuffer = buffer.remaining();
+ }
+
+ int bytesToCopy = length < remainInBuffer ? length : remainInBuffer;
+ buffer.put(b, offset, bytesToCopy);
+ offset += bytesToCopy;
+ length -= bytesToCopy;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (buffer.position() == 0) {
+ return;
+ }
+ buffer.flip();
+ fileChannel.write(buffer, position);
+ position += buffer.position();
+ buffer.clear();
+ }
+
+ @Override
+ public long getFilePointer() {
+ return position + buffer.position();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // only close the file if it has not been closed yet
+ if (isOpen) {
+ boolean success = false;
+ try {
+ flush();
+ success = true;
+ } finally {
+ isOpen = false;
+ if (!success) {
+ try {
+ file.close();
+ parent.onIndexOutputClosed(this);
+ } catch (Throwable t) {
+ // Suppress so we don't mask original exception
+ }
+ } else
+ file.close();
+ }
+ }
+ }
+
+ /**
+ * Random-access methods
+ */
+ @Override
+ public void seek(long pos) throws IOException {
+ flush();
+ position = pos;
+ }
+
+ @Override
+ public long length() throws IOException {
+ return file.length();
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ file.setLength(length);
+ }
+
+ @Override
+ public void writeInt(int i) throws IOException {
+ try {
+ buffer.putInt(i);
+ } catch (BufferOverflowException e) {
+ flush();
+ buffer.putInt(i);
+ }
+ }
+
+ @Override
+ public void writeLong(long i) throws IOException {
+ try {
+ buffer.putLong(i);
+ } catch (BufferOverflowException e) {
+ flush();
+ buffer.putLong(i);
+ }
+ }
+ }
+
+ /**
+ * Helper class to allocator implementations allowing to clean direct buffers.
+ *
+ * TODO we need to make this shared by different places in Lucene to clean Direct Buffers
+ */
+ public static class Cleaner {
+ public static final boolean CLEAN_SUPPORTED;
+ private static final Method directBufferCleaner;
+ private static final Method directBufferCleanerClean;
+
+ static {
+ Method directBufferCleanerX = null;
+ Method directBufferCleanerCleanX = null;
+ boolean v;
+ try {
+ directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
+ directBufferCleanerX.setAccessible(true);
+ directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
+ directBufferCleanerCleanX.setAccessible(true);
+ v = true;
+ } catch (Exception e) {
+ v = false;
+ }
+ CLEAN_SUPPORTED = v;
+ directBufferCleaner = directBufferCleanerX;
+ directBufferCleanerClean = directBufferCleanerCleanX;
+ }
+
+ public static void clean(ByteBuffer buffer) {
+ if (CLEAN_SUPPORTED && buffer.isDirect()) {
+ try {
+ Object cleaner = directBufferCleaner.invoke(buffer);
+ directBufferCleanerClean.invoke(cleaner);
+ } catch (Exception e) {
+ // silently ignore exception
+ }
+ }
+ }
+ }
+}
Index: lucene/src/java/org/apache/lucene/store/NIOFSDirectory.java
===================================================================
--- lucene/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision 990822)
+++ lucene/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision )
@@ -78,6 +78,14 @@
return new NIOFSIndexInput(new File(getDirectory(), name), bufferSize, getReadChunkSize());
}
+ /**
+ * The NIO directory uses channel stream by default.
+ */
+ @Override
+ protected boolean defaultUseChannelStream() {
+ return true;
+ }
+
protected static class NIOFSIndexInput extends SimpleFSDirectory.SimpleFSIndexInput {
private ByteBuffer byteBuf; // wraps the buffer for NIO