Index: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (working copy) @@ -0,0 +1,288 @@ +package org.apache.lucene.store; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.util.EnumSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** An implementation of {@link FSDirectory} + * using java.nio.channels.AsynchronousFileChannel. This + * class is useful on Windows, where it will take advantage of + * overlapped IO, but on other platforms this is likely to + * be comparable in performance if not worse than {@link SimpleFSDirectory}. + * + * This class does not synchronize on the file position + * for reads, and does not cause problems when its thread + * is interrupted. */ + +public class AsyncFSDirectory extends FSDirectory { + + private static final FileAttribute[] NO_ATTRIBUTES = new FileAttribute[0]; + + private final ExecutorService executor; + + /** Create a new AsyncFSDirectory for the named location. + * + * @param path the path of the directory + * @param lockFactory the lock factory to use, or null for the default + * ({@link NativeFSLockFactory}); + * @param executor the executor service for IO notifications + * @throws IOException if there is a low-level I/O error + */ + public AsyncFSDirectory(File path, LockFactory lockFactory, ExecutorService executor) throws IOException { + super(path, lockFactory); + this.executor = executor; + } + + /** Create a new AsyncFSDirectory for the named location. + * + * @param path the path of the directory + * @param lockFactory the lock factory to use, or null for the default + * ({@link NativeFSLockFactory}); + * @throws IOException if there is a low-level I/O error + */ + public AsyncFSDirectory(File path, LockFactory lockFactory) throws IOException { + this(path, lockFactory, null); + } + + /** Create a new AsyncFSDirectory for the named location and {@link NativeFSLockFactory}. + * + * @param path the path of the directory + * @throws IOException if there is a low-level I/O error + */ + public AsyncFSDirectory(File path) throws IOException { + this(path, null); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) + throws IOException { + ensureOpen(); + + ensureCanWrite(name); + + File file = new File(directory, name); + return new AsyncFSIndexOutput(this, name, AsynchronousFileChannel.open(file.toPath(), + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE)); + } + + /** Creates an IndexInput for the file with the given name. */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + ensureOpen(); + Path path = new File(directory, name).toPath(); + return new AsyncFSIndexInput(path, context, getReadChunkSize()); + } + + @Override + public IndexInputSlicer createSlicer(final String name, + final IOContext context) throws IOException { + ensureOpen(); + final Path path = new File(directory, name).toPath(); + final AsynchronousFileChannel channel = AsynchronousFileChannel.open( + path, + EnumSet.of(StandardOpenOption.READ), + executor, + NO_ATTRIBUTES); + return new IndexInputSlicer() { + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public IndexInput openSlice(String sliceDescription, long offset, long length) { + return new AsyncFSIndexInput(sliceDescription, path, channel, offset, + length, BufferedIndexInput.bufferSize(context), getReadChunkSize()); + } + }; + } + + /** + * Writes output with {@link AsynchronousFileChannel#write(ByteBuffer, long)} + */ + protected static class AsyncFSIndexOutput extends FSIndexOutput { + private long fpos = 0; + + public AsyncFSIndexOutput(FSDirectory parent, String name, + AsynchronousFileChannel file) throws IOException { + super(parent, name, file); + } + + /** output methods: */ + @Override + public void flushBuffer(byte[] b, int offset, int size) throws IOException { + ensureOpen(); + if (size == 0) { + return; //Avoid creating a ByteBuffer if we don't need to. + } + + ByteBuffer bb = ByteBuffer.wrap(b, offset, size); + boolean interrupted = false; + try { + while (bb.remaining() > 0) { + try { + int written = file.write(bb, fpos).get(); + fpos += written; + } catch (ExecutionException ee) { + throw new IOException(ee.getCause()); + } catch (InterruptedException ie) { + interrupted = true; + } + } + } finally { + if (interrupted) + Thread.currentThread().interrupt(); + } + } + + @Override + public long length() throws IOException { + return file.size(); + } + + @Override + public void setLength(long length) throws IOException { + file.truncate(length); + } + } + + protected class AsyncFSIndexInput extends FSIndexInput { + + private ByteBuffer byteBuf; // wraps the buffer for NIO + + public AsyncFSIndexInput(Path path, IOContext context, int chunkSize) throws IOException { + super("AsyncFSIndexInput(path=\"" + path + "\")", AsynchronousFileChannel.open( + path, EnumSet.of(StandardOpenOption.READ), executor, NO_ATTRIBUTES), context, chunkSize); + } + + public AsyncFSIndexInput(String sliceDescription, Path path, AsynchronousFileChannel fc, long off, long length, int bufferSize, int chunkSize) { + super("AsyncFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + off + ":" + (off+length) + ")", fc, off, length, bufferSize, chunkSize); + isClone = true; + } + + @Override + protected void newBuffer(byte[] newBuffer) { + super.newBuffer(newBuffer); + byteBuf = ByteBuffer.wrap(newBuffer); + } + + @Override + protected void readInternal(byte[] b, int offset, int len) throws IOException { + + final ByteBuffer bb; + + // Determine the ByteBuffer we should use + if (b == buffer && 0 == offset) { + // Use our own pre-wrapped byteBuf: + assert byteBuf != null; + byteBuf.clear(); + byteBuf.limit(len); + bb = byteBuf; + } else { + bb = ByteBuffer.wrap(b, offset, len); + } + + int readOffset = bb.position(); + int readLength = bb.limit() - readOffset; + assert readLength == len; + + long pos = getFilePointer() + off; + + if (pos + len > end) { + throw new EOFException("read past EOF: " + this); + } + + try { + boolean interrupted = false; + try { + while (readLength > 0) { + final int limit; + if (readLength > chunkSize) { + // LUCENE-1566 - work around JVM Bug by breaking + // very large reads into chunks + limit = readOffset + chunkSize; + } else { + limit = readOffset + readLength; + } + bb.limit(limit); + Future future = file.read(bb, pos); + + //We have to read in a loop here since future.get() could + //throw InterruptedException. + while (true) { + try { + int i = future.get(); + if (i < 0) { + throw new EOFException("Attempt to read past end of file"); + } + pos += i; + readOffset += i; + readLength -= i; + break; + } catch (ExecutionException ee) { + throw new IOException(ee.getCause()); + } catch (InterruptedException ie) { + interrupted = true; + } + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } catch (OutOfMemoryError e) { + // propagate OOM up and add a hint for 32bit VM Users hitting the bug + // with a large chunk size in the fast path. + final OutOfMemoryError outOfMemoryError = new OutOfMemoryError( + "OutOfMemoryError likely caused by the Sun VM Bug described in " + + "https://issues.apache.org/jira/browse/LUCENE-1566; try calling FSDirectory.setReadChunkSize " + + "with a value smaller than the current chunk size (" + chunkSize + ")"); + outOfMemoryError.initCause(e); + throw outOfMemoryError; + } catch (IOException ioe) { + throw new IOException(ioe.getMessage() + ": " + this, ioe); + } + } + + @Override + protected void seekInternal(long pos) throws IOException {} + + @Override + boolean isFDValid() throws IOException { + return file.isOpen(); + } + + @Override + protected long length(AsynchronousFileChannel fd) throws IOException { + return file.size(); + } + } +} Index: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (working copy) Property changes on: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Added: svn:keywords ## -0,0 +1 ## +Date Author Id Revision HeadURL \ No newline at end of property Index: lucene/core/src/java/org/apache/lucene/store/FSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (revision 1458525) +++ lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (working copy) @@ -17,12 +17,17 @@ * limitations under the License. */ +import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.FilenameFilter; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; + import java.util.Collection; import static java.util.Collections.synchronizedSet; import java.util.HashSet; @@ -285,7 +290,7 @@ ensureOpen(); ensureCanWrite(name); - return new FSIndexOutput(this, name); + return new RandomAccessFileFSIndexOutput(this, name); } protected void ensureCanWrite(String name) throws IOException { @@ -298,7 +303,7 @@ throw new IOException("Cannot overwrite: " + file); } - protected void onIndexOutputClosed(FSIndexOutput io) { + protected void onIndexOutputClosed(FSIndexOutput io) { staleFiles.add(io.name); } @@ -392,10 +397,11 @@ return chunkSize; } - /** Base class for reading input from a RandomAccessFile */ - protected abstract static class FSIndexInput extends BufferedIndexInput { - /** the underlying RandomAccessFile */ - protected final RandomAccessFile file; + /** Base class for reading input from an object that accesses a file. This class is parameterized + * with the concrete file accessor class (for example RandomAccessFile or FileChannel) */ + protected abstract static class FSIndexInput extends BufferedIndexInput { + /** the underlying file accessor object */ + protected final T file; boolean isClone = false; /** maximum read length on a 32bit JVM to prevent incorrect OOM, see LUCENE-1566 */ protected final int chunkSize; @@ -405,16 +411,16 @@ protected final long end; /** Create a new FSIndexInput, reading the entire file from path */ - protected FSIndexInput(String resourceDesc, File path, IOContext context, int chunkSize) throws IOException { + protected FSIndexInput(String resourceDesc, T file, IOContext context, int chunkSize) throws IOException { super(resourceDesc, context); - this.file = new RandomAccessFile(path, "r"); + this.file = file; this.chunkSize = chunkSize; this.off = 0L; - this.end = file.length(); + this.end = fileLength(file); } /** Create a new FSIndexInput, representing a slice of an existing open file */ - protected FSIndexInput(String resourceDesc, RandomAccessFile file, long off, long length, int bufferSize, int chunkSize) { + protected FSIndexInput(String resourceDesc, T file, long off, long length, int bufferSize, int chunkSize) { super(resourceDesc, bufferSize); this.file = file; this.chunkSize = chunkSize; @@ -432,8 +438,9 @@ } @Override - public FSIndexInput clone() { - FSIndexInput clone = (FSIndexInput)super.clone(); + @SuppressWarnings("unchecked") + public FSIndexInput clone() { + FSIndexInput clone = (FSIndexInput)super.clone(); clone.isClone = true; return clone; } @@ -446,33 +453,27 @@ /** Method used for testing. Returns true if the underlying * file descriptor is valid. */ - boolean isFDValid() throws IOException { - return file.getFD().valid(); - } + abstract boolean isFDValid() throws IOException; + + abstract long fileLength(T fd) throws IOException; } /** - * Writes output with {@link RandomAccessFile#write(byte[], int, int)} + * Writes output using a file accessor object supplied by a subclass */ - protected static class FSIndexOutput extends BufferedIndexOutput { + protected abstract static class FSIndexOutput extends BufferedIndexOutput { + final String name; private final FSDirectory parent; - private final String name; - private final RandomAccessFile file; private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once - public FSIndexOutput(FSDirectory parent, String name) throws IOException { + protected final T file; + + public FSIndexOutput(FSDirectory parent, String name, T file) throws IOException { this.parent = parent; this.name = name; - file = new RandomAccessFile(new File(parent.directory, name), "rw"); + this.file = file; isOpen = true; } - - /** output methods: */ - @Override - public void flushBuffer(byte[] b, int offset, int size) throws IOException { - assert isOpen; - file.write(b, offset, size); - } @Override public void close() throws IOException { @@ -497,8 +498,30 @@ } } } + + protected void ensureOpen() { + assert isOpen; + } + } + + protected static class RandomAccessFileFSIndexOutput extends FSIndexOutput { + public RandomAccessFileFSIndexOutput(FSDirectory parent, String name) throws IOException { + super(parent, name, new RandomAccessFile(new File(parent.directory, name), "rw")); + } + + public RandomAccessFileFSIndexOutput(FSDirectory parent, String name, RandomAccessFile file) throws IOException { + super(parent, name, file); + } + @Override + protected void flushBuffer(byte[] b, int offset, int len) + throws IOException { + ensureOpen(); + file.write(b, offset, len); + } + + @Override public long length() throws IOException { return file.length(); } @@ -508,7 +531,40 @@ file.setLength(length); } } + + protected static class FileChannelFSIndexOuptut extends FSIndexOutput { + public FileChannelFSIndexOuptut(FSDirectory parent, String name) throws IOException { + super(parent, name, FileChannel.open( + new File(parent.directory, name).toPath(), + StandardOpenOption.READ, StandardOpenOption.WRITE)); + } + + public FileChannelFSIndexOuptut(FSDirectory parent, String name, FileChannel file) throws IOException { + super(parent, name, file); + } + + @Override + protected void flushBuffer(byte[] b, int offset, int len) + throws IOException { + ensureOpen(); + ByteBuffer bb = ByteBuffer.wrap(b, offset, len); + while (bb.remaining() > 0) { + file.write(bb); + } + } + + @Override + public long length() throws IOException { + return file.size(); + } + + @Override + public void setLength(long length) throws IOException { + file.truncate(length); + } + } + protected void fsync(String name) throws IOException { File fullFile = new File(directory, name); boolean success = false; Index: lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java (revision 1458525) +++ lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java (working copy) @@ -19,11 +19,11 @@ import java.io.IOException; import java.io.File; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; // javadoc @link import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import java.nio.file.StandardOpenOption; import java.security.AccessController; import java.security.PrivilegedExceptionAction; @@ -133,7 +133,7 @@ this.chunkSizePower = 31 - Integer.numberOfLeadingZeros(maxChunkSize); assert this.chunkSizePower >= 0 && this.chunkSizePower <= 30; } - + /** * true, if this platform supports unmapping mmapped files. */ @@ -189,12 +189,9 @@ @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); - File f = new File(getDirectory(), name); - RandomAccessFile raf = new RandomAccessFile(f, "r"); - try { - return new MMapIndexInput("MMapIndexInput(path=\"" + f + "\")", raf); - } finally { - raf.close(); + File file = new File(getDirectory(), name); + try (FileChannel c = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { + return new MMapIndexInput("MMapIndexInput(path=\"" + file.toString() + "\")", c); } } @@ -218,8 +215,8 @@ private final class MMapIndexInput extends ByteBufferIndexInput { private final boolean useUnmapHack; - MMapIndexInput(String resourceDescription, RandomAccessFile raf) throws IOException { - super(resourceDescription, map(raf, 0, raf.length()), raf.length(), chunkSizePower, getUseUnmap()); + MMapIndexInput(String resourceDescription, FileChannel fc) throws IOException { + super(resourceDescription, map(fc, 0, fc.size()), fc.size(), chunkSizePower, getUseUnmap()); this.useUnmapHack = getUseUnmap(); } @@ -256,9 +253,9 @@ } /** Maps a file into a set of buffers */ - ByteBuffer[] map(RandomAccessFile raf, long offset, long length) throws IOException { + ByteBuffer[] map(FileChannel fc, long offset, long length) throws IOException { if ((length >>> chunkSizePower) >= Integer.MAX_VALUE) - throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + raf.toString()); + throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + fc.toString()); final long chunkSize = 1L << chunkSizePower; @@ -268,13 +265,12 @@ ByteBuffer buffers[] = new ByteBuffer[nrBuffers]; long bufferStart = 0L; - FileChannel rafc = raf.getChannel(); for (int bufNr = 0; bufNr < nrBuffers; bufNr++) { int bufSize = (int) ( (length > (bufferStart + chunkSize)) ? chunkSize : (length - bufferStart) ); - buffers[bufNr] = rafc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); + buffers[bufNr] = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); bufferStart += bufSize; } Index: lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision 1458525) +++ lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java (working copy) @@ -20,10 +20,10 @@ import java.io.File; import java.io.EOFException; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; // javadoc @link import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; import java.util.concurrent.Future; // javadoc /** @@ -85,7 +85,7 @@ final IOContext context) throws IOException { ensureOpen(); final File path = new File(getDirectory(), name); - final RandomAccessFile descriptor = new RandomAccessFile(path, "r"); + final FileChannel descriptor = FileChannel.open(path.toPath(), StandardOpenOption.READ); return new Directory.IndexInputSlicer() { @Override @@ -95,7 +95,7 @@ @Override public IndexInput openSlice(String sliceDescription, long offset, long length) { - return new NIOFSIndexInput(sliceDescription, path, descriptor, descriptor.getChannel(), offset, + return new NIOFSIndexInput(sliceDescription, path, descriptor, offset, length, BufferedIndexInput.bufferSize(context), getReadChunkSize()); } }; @@ -104,20 +104,16 @@ /** * Reads bytes with {@link FileChannel#read(ByteBuffer, long)} */ - protected static class NIOFSIndexInput extends FSIndexInput { + protected static class NIOFSIndexInput extends FSIndexInput { private ByteBuffer byteBuf; // wraps the buffer for NIO - final FileChannel channel; - public NIOFSIndexInput(File path, IOContext context, int chunkSize) throws IOException { - super("NIOFSIndexInput(path=\"" + path + "\")", path, context, chunkSize); - channel = file.getChannel(); + super("NIOFSIndexInput(path=\"" + path + "\")", FileChannel.open(path.toPath(), StandardOpenOption.READ), context, chunkSize); } - public NIOFSIndexInput(String sliceDescription, File path, RandomAccessFile file, FileChannel fc, long off, long length, int bufferSize, int chunkSize) { - super("NIOFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + off + ":" + (off+length) + ")", file, off, length, bufferSize, chunkSize); - channel = fc; + public NIOFSIndexInput(String sliceDescription, File path, FileChannel fc, long off, long length, int bufferSize, int chunkSize) { + super("NIOFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + off + ":" + (off+length) + ")", fc, off, length, bufferSize, chunkSize); isClone = true; } @@ -164,7 +160,7 @@ limit = readOffset + readLength; } bb.limit(limit); - int i = channel.read(bb, pos); + int i = file.read(bb, pos); pos += i; readOffset += i; readLength -= i; @@ -185,6 +181,16 @@ @Override protected void seekInternal(long pos) throws IOException {} + + @Override + boolean isFDValid() throws IOException { + return file.isOpen(); + } + + @Override + long fileLength(FileChannel fd) throws IOException { + return fd.size(); + } } } Index: lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java (revision 1458525) +++ lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java (working copy) @@ -83,10 +83,10 @@ * Reads bytes with {@link RandomAccessFile#seek(long)} followed by * {@link RandomAccessFile#read(byte[], int, int)}. */ - protected static class SimpleFSIndexInput extends FSIndexInput { + protected static class SimpleFSIndexInput extends FSIndexInput { public SimpleFSIndexInput(String resourceDesc, File path, IOContext context, int chunkSize) throws IOException { - super(resourceDesc, path, context, chunkSize); + super(resourceDesc, new RandomAccessFile(path, "r"), context, chunkSize); } public SimpleFSIndexInput(String resourceDesc, RandomAccessFile file, long off, long length, int bufferSize, int chunkSize) { @@ -136,5 +136,15 @@ @Override protected void seekInternal(long position) { } + + @Override + boolean isFDValid() throws IOException { + return file.getFD().valid(); + } + + @Override + long fileLength(RandomAccessFile fd) throws IOException { + return fd.length(); + } } } Index: lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (revision 1458525) +++ lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (working copy) @@ -303,7 +303,8 @@ private static final List FS_DIRECTORIES = Arrays.asList( "SimpleFSDirectory", "NIOFSDirectory", - "MMapDirectory" + "MMapDirectory", + "AsyncFSDirectory" ); /** All {@link Directory} implementations. */