Index: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (working copy) @@ -0,0 +1,247 @@ +package org.apache.lucene.store; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.util.EnumSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** An implementation of {@link FSDirectory} + * using java.nio.channels.AsynchronousFileChannel. This + * class is useful on Windows, where it will take advantage of + * overlapped IO, but on other platforms this is likely to + * be comparable in performance if not worse than {@link SimpleFSDirectory}. + * + * This class does not synchronize on the file position + * for reads, and does not cause problems when its thread + * is interrupted. + */ +public class AsyncFSDirectory extends FSDirectory { + + private static final FileAttribute[] NO_ATTRIBUTES = new FileAttribute[0]; + + private final ExecutorService executor; + + /** Create a new AsyncFSDirectory for the named location. + * + * @param path the path of the directory + * @param lockFactory the lock factory to use, or null for the default + * ({@link NativeFSLockFactory}); + * @param executor the executor service for IO notifications + * @throws IOException if there is a low-level I/O error + */ + public AsyncFSDirectory(File path, LockFactory lockFactory, ExecutorService executor) throws IOException { + super(path, lockFactory); + this.executor = executor; + } + + /** Create a new AsyncFSDirectory for the named location. + * + * @param path the path of the directory + * @param lockFactory the lock factory to use, or null for the default + * ({@link NativeFSLockFactory}); + * @throws IOException if there is a low-level I/O error + */ + public AsyncFSDirectory(File path, LockFactory lockFactory) throws IOException { + this(path, lockFactory, null); + } + + /** Create a new AsyncFSDirectory for the named location and {@link NativeFSLockFactory}. + * + * @param path the path of the directory + * @throws IOException if there is a low-level I/O error + */ + public AsyncFSDirectory(File path) throws IOException { + this(path, null); + } + + /** Creates an IndexInput for the file with the given name. */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + ensureOpen(); + File path = new File(getDirectory(), name); + AsynchronousFileChannel fc = AsynchronousFileChannel.open(path.toPath(), EnumSet.of(StandardOpenOption.READ), executor, NO_ATTRIBUTES); + return new AsyncFSIndexInput("AsyncFSIndexInput(path=\"" + path + "\")", fc, context, getReadChunkSize()); + } + + @Override + public IndexInputSlicer createSlicer(final String name, final IOContext context) throws IOException { + ensureOpen(); + final File path = new File(getDirectory(), name); + final AsynchronousFileChannel channel = AsynchronousFileChannel.open(path.toPath(), EnumSet.of(StandardOpenOption.READ), executor, NO_ATTRIBUTES); + return new IndexInputSlicer() { + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public IndexInput openSlice(String sliceDescription, long offset, long length) { + return new AsyncFSIndexInput("AsyncFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + offset + ":" + (offset+length) + ")", + channel, offset, length, BufferedIndexInput.bufferSize(context), getReadChunkSize()); + } + }; + } + + protected class AsyncFSIndexInput extends BufferedIndexInput { + /** the file channel we will read from */ + protected final AsynchronousFileChannel channel; + /** is this instance a clone and hence does not own the file to close it */ + boolean isClone = false; + /** maximum read length on a 32bit JVM to prevent incorrect OOM, see LUCENE-1566 */ + protected final int chunkSize; + /** start offset: non-zero in the slice case */ + protected final long off; + /** end offset (start+length) */ + protected final long end; + + private ByteBuffer byteBuf; // wraps the buffer for NIO + + public AsyncFSIndexInput(String resourceDesc, AsynchronousFileChannel fc, IOContext context, int chunkSize) throws IOException { + super(resourceDesc, context); + this.channel = fc; + this.chunkSize = chunkSize; + this.off = 0L; + this.end = fc.size(); + } + + public AsyncFSIndexInput(String resourceDesc, AsynchronousFileChannel fc, long off, long length, int bufferSize, int chunkSize) { + super(resourceDesc, bufferSize); + this.channel = fc; + this.chunkSize = chunkSize; + this.off = off; + this.end = off + length; + this.isClone = true; + } + + @Override + public void close() throws IOException { + if (!isClone) { + channel.close(); + } + } + + @Override + public AsyncFSIndexInput clone() { + AsyncFSIndexInput clone = (AsyncFSIndexInput)super.clone(); + clone.isClone = true; + return clone; + } + + @Override + public final long length() { + return end - off; + } + + @Override + protected void newBuffer(byte[] newBuffer) { + super.newBuffer(newBuffer); + byteBuf = ByteBuffer.wrap(newBuffer); + } + + @Override + protected void readInternal(byte[] b, int offset, int len) throws IOException { + + final ByteBuffer bb; + + // Determine the ByteBuffer we should use + if (b == buffer && 0 == offset) { + // Use our own pre-wrapped byteBuf: + assert byteBuf != null; + byteBuf.clear(); + byteBuf.limit(len); + bb = byteBuf; + } else { + bb = ByteBuffer.wrap(b, offset, len); + } + + int readOffset = bb.position(); + int readLength = bb.limit() - readOffset; + assert readLength == len; + + long pos = getFilePointer() + off; + + if (pos + len > end) { + throw new EOFException("read past EOF: " + this); + } + + try { + boolean interrupted = false; + try { + while (readLength > 0) { + final int limit; + if (readLength > chunkSize) { + // LUCENE-1566 - work around JVM Bug by breaking + // very large reads into chunks + limit = readOffset + chunkSize; + } else { + limit = readOffset + readLength; + } + bb.limit(limit); + final Future future = channel.read(bb, pos); + + //We have to read in a loop here since future.get() could + //throw InterruptedException. + while (true) { + try { + int i = future.get(); + if (i < 0) { + throw new EOFException("Attempt to read past end of file"); + } + pos += i; + readOffset += i; + readLength -= i; + break; + } catch (ExecutionException ee) { + throw new IOException(ee.getCause()); + } catch (InterruptedException ie) { + interrupted = true; + } + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } catch (OutOfMemoryError e) { + // propagate OOM up and add a hint for 32bit VM Users hitting the bug + // with a large chunk size in the fast path. + final OutOfMemoryError outOfMemoryError = new OutOfMemoryError( + "OutOfMemoryError likely caused by the Sun VM Bug described in " + + "https://issues.apache.org/jira/browse/LUCENE-1566; try calling FSDirectory.setReadChunkSize " + + "with a value smaller than the current chunk size (" + chunkSize + ")"); + outOfMemoryError.initCause(e); + throw outOfMemoryError; + } catch (IOException ioe) { + throw new IOException(ioe.getMessage() + ": " + this, ioe); + } + } + + @Override + protected void seekInternal(long pos) throws IOException {} + } +} Index: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java =================================================================== --- lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (working copy) Property changes on: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java ___________________________________________________________________ Added: svn:keywords ## -0,0 +1 ## +Date Author Id Revision HeadURL \ No newline at end of property Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java (revision 1459439) +++ lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java (working copy) @@ -31,12 +31,38 @@ @Override public boolean reject(Thread t) { + // Try to get the stack lazily and reuse it if it's really necessary. + StackTraceElement [] stack = null; + if (isJ9) { - StackTraceElement [] stack = t.getStackTrace(); + stack = t.getStackTrace(); if (stack.length > 0 && stack[stack.length - 1].getClassName().equals("java.util.Timer$TimerImpl")) { return true; // LUCENE-4736 } } + + // LUCENE-4864; ignore threads inside async IO. There is no easy way to detect whether + // they're part of the system group but we know all these are daemons. + if (t.isDaemon()) { + if (stack == null) stack = t.getStackTrace(); + if (stack.length - 2 >= 0) { + // Some of the Iocp's stuff is called directly, bypassing the threadpool. + if (stack[stack.length - 1].getClassName().equals("java.lang.Thread") && + stack[stack.length - 2].getClassName().startsWith("sun.nio.")) { + return true; + } + + // The default threadpool's threads will end up somewhere in EventHandlerTask's + // run method. + for (int i = 0; i < stack.length - 1; i++) { + if (stack[i].getClassName().equals("sun.nio.ch.Iocp$EventHandlerTask") && + stack[i].getMethodName().equals("run")) { + return true; + } + } + } + } + return false; } }