Index: lucene/contrib/CHANGES.txt =================================================================== --- lucene/contrib/CHANGES.txt (revision 1231707) +++ lucene/contrib/CHANGES.txt (working copy) @@ -58,6 +58,14 @@ way as DirectSpellChecker. This can be used to merge top-N results from more than one SpellChecker. (James Dyer via Robert Muir) + * LUCENE-2795: Generified DirectIOLinuxDirectory to work across any + unix supporting the O_DIRECT flag when opening a file (tested on + Linux and OS X but likely other Unixes will work), and improved it + so it can be used for indexing and searching. The directory uses + direct IO when doing large merges to avoid unnecessarily evicting + cached IO pages due to large merges. (Varun Thacker, Mike + McCandless) + API Changes * LUCENE-2606: Changed RegexCapabilities interface to fix thread Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java (working copy) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java (working copy) @@ -27,68 +27,116 @@ import java.nio.channels.FileChannel; import org.apache.lucene.store.Directory; // javadoc -import org.apache.lucene.store.NativeFSLockFactory; // javadoc +import org.apache.lucene.store.IOContext.Context; +// TODO +// - newer Linux kernel versions (after 2.6.29) have +// improved MADV_SEQUENTIAL (and hopefully also +// FADV_SEQUENTIAL) interaction with the buffer +// cache; we should explore using that instead of direct +// IO when context is merge + /** - * An {@link Directory} implementation that uses the - * Linux-specific O_DIRECT flag to bypass all OS level - * caching. To use this you must compile + * A {@link Directory} implementation for all Unixes that uses + * DIRECT I/O to bypass OS level IO caching during + * merging. For all other cases (searching, writing) we delegate + * to the provided Directory instance. + * + *
To use this you must compile
* NativePosixUtil.cpp (exposes Linux-specific APIs through
- * JNI) for your platform.
+ * JNI) for your platform, by running ant
+ * build-native-unix, and then putting the resulting
+ * libNativePosixUtil.so (from
+ * lucene/build/native) onto your dynamic
+ * linker search path.
*
*
WARNING: this code is very new and quite easily
* could contain horrible bugs. For example, here's one
- * known issue: if you use seek in IndexOutput, and then
+ * known issue: if you use seek in IndexOutput, and then
* write more than one buffer's worth of bytes, then the
- * file will be wrong. Lucene does not do this (only writes
- * small number of bytes after seek).
-
+ * file will be wrong. Lucene does not do this today (only writes
+ * small number of bytes after seek), but that may change.
+ *
+ *
This directory passes Solr and Lucene tests on Linux + * and OS X; other Unixes should work but have not been + * tested! Use at your own risk. + * * @lucene.experimental */ -public class DirectIOLinuxDirectory extends FSDirectory { +public class NativeUnixDirectory extends FSDirectory { + // TODO: this is OS dependent, but likely 512 is the LCD private final static long ALIGN = 512; private final static long ALIGN_NOT_MASK = ~(ALIGN-1); + + /** Default buffer size before writing to disk (256 MB); + * larger means less IO load but more RAM and direct + * buffer storage space consumed during merging. */ - private final int forcedBufferSize; + public final static int DEFAULT_MERGE_BUFFER_SIZE = 262144; + /** Default min expected merge size before direct IO is + * used (10 MB): */ + public final static long DEFAULT_MIN_BYTES_DIRECT = 10*1024*1024; + + private final int mergeBufferSize; + private final long minBytesDirect; + private final Directory delegate; + /** Create a new NIOFSDirectory for the named location. * * @param path the path of the directory - * @param lockFactory the lock factory to use, or null for the default - * ({@link NativeFSLockFactory}); - * @param forcedBufferSize if this is 0, just use Lucene's - * default buffer size; else, force this buffer size. - * For best performance, force the buffer size to - * something fairly large (eg 1 MB), but note that this - * will eat up the JRE's direct buffer storage space + * @param mergeBufferSize Size of buffer to use for + * merging. See {@link #DEFAULT_MERGE_BUFFER_SIZE}. + * @param minBytesDirect Merges, or files to be opened for + * reading, smaller than this will + * not use direct IO. See {@link + * #DEFAULT_MIN_MERGE_BYTES_DIRECT} + * @param delegate fallback Directory for non-merges * @throws IOException */ - public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException { - super(path, lockFactory); - this.forcedBufferSize = forcedBufferSize; + public NativeUnixDirectory(File path, int mergeBufferSize, long minBytesDirect, Directory delegate) throws IOException { + super(path, delegate.getLockFactory()); + if ((mergeBufferSize & ALIGN) != 0) { + throw new IllegalArgumentException("mergeBufferSize must be 0 mod " + ALIGN + " (got: " + mergeBufferSize + ")"); + } + this.mergeBufferSize = mergeBufferSize; + this.minBytesDirect = minBytesDirect; + this.delegate = delegate; } + + /** Create a new NIOFSDirectory for the named location. + * + * @param path the path of the directory + * @param delegate fallback Directory for non-merges + * @throws IOException + */ + public NativeUnixDirectory(File path, Directory delegate) throws IOException { + this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, delegate); + } @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); - return new DirectIOLinuxIndexInput(new File(getDirectory(), name), - bufferSize(context)); + if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect || fileLength(name) < minBytesDirect) { + return delegate.openInput(name, context); + } else { + return new NativeUnixIndexInput(new File(getDirectory(), name), mergeBufferSize); + } } @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); - ensureCanWrite(name); - return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), bufferSize(context)); + if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect) { + return delegate.createOutput(name, context); + } else { + ensureCanWrite(name); + return new NativeUnixIndexOutput(new File(getDirectory(), name), mergeBufferSize); + } } - - private int bufferSize(IOContext context) { - return forcedBufferSize != 0 ? forcedBufferSize : BufferedIndexInput - .bufferSize(context); - } - private final static class DirectIOLinuxIndexOutput extends IndexOutput { + private final static class NativeUnixIndexOutput extends IndexOutput { private final ByteBuffer buffer; private final FileOutputStream fos; private final FileChannel channel; @@ -101,9 +149,9 @@ private long fileLength; private boolean isOpen; - public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException { + public NativeUnixIndexOutput(File path, int bufferSize) throws IOException { //this.path = path; - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); + final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); fos = new FileOutputStream(fd); //fos = new FileOutputStream(path); channel = fos.getChannel(); @@ -206,7 +254,7 @@ @Override public long length() throws IOException { - return fileLength; + return fileLength + bufferPos; } @Override @@ -233,7 +281,7 @@ } } - private final static class DirectIOLinuxIndexInput extends IndexInput { + private final static class NativeUnixIndexInput extends IndexInput { private final ByteBuffer buffer; private final FileInputStream fis; private final FileChannel channel; @@ -244,10 +292,9 @@ private long filePos; private int bufferPos; - public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException { - // TODO make use of IOContext - super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")"); - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); + public NativeUnixIndexInput(File path, int bufferSize) throws IOException { + super("NativeUnixIndexInput(path=\"" + path.getPath() + "\")"); + final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); fis = new FileInputStream(fd); channel = fis.getChannel(); this.bufferSize = bufferSize; @@ -260,7 +307,7 @@ } // for clone - public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException { + public NativeUnixIndexInput(NativeUnixIndexInput other) throws IOException { super(other.toString()); this.fis = null; channel = other.channel; @@ -296,13 +343,17 @@ public void seek(long pos) throws IOException { if (pos != getFilePointer()) { final long alignedPos = pos & ALIGN_NOT_MASK; - //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this); filePos = alignedPos-bufferSize; - refill(); final int delta = (int) (pos - alignedPos); - buffer.position(delta); - bufferPos = delta; + if (delta != 0) { + refill(); + buffer.position(delta); + bufferPos = delta; + } else { + // force refill on next read + bufferPos = bufferSize; + } } } @@ -371,7 +422,7 @@ @Override public Object clone() { try { - return new DirectIOLinuxIndexInput(this); + return new NativeUnixIndexInput(this); } catch (IOException ioe) { throw new RuntimeException("IOException during clone: " + this, ioe); } Index: lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java (revision 1231707) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java (working copy) @@ -1,380 +0,0 @@ -package org.apache.lucene.store; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.FileInputStream; -import java.io.FileDescriptor; -import java.io.FileOutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import org.apache.lucene.store.Directory; // javadoc -import org.apache.lucene.store.NativeFSLockFactory; // javadoc - -/** - * An {@link Directory} implementation that uses the - * Linux-specific O_DIRECT flag to bypass all OS level - * caching. To use this you must compile - * NativePosixUtil.cpp (exposes Linux-specific APIs through - * JNI) for your platform. - * - *
WARNING: this code is very new and quite easily
- * could contain horrible bugs. For example, here's one
- * known issue: if you use seek in IndexOutput, and then
- * write more than one buffer's worth of bytes, then the
- * file will be wrong. Lucene does not do this (only writes
- * small number of bytes after seek).
-
- * @lucene.experimental
- */
-public class DirectIOLinuxDirectory extends FSDirectory {
-
- private final static long ALIGN = 512;
- private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
-
- private final int forcedBufferSize;
-
- /** Create a new NIOFSDirectory for the named location.
- *
- * @param path the path of the directory
- * @param lockFactory the lock factory to use, or null for the default
- * ({@link NativeFSLockFactory});
- * @param forcedBufferSize if this is 0, just use Lucene's
- * default buffer size; else, force this buffer size.
- * For best performance, force the buffer size to
- * something fairly large (eg 1 MB), but note that this
- * will eat up the JRE's direct buffer storage space
- * @throws IOException
- */
- public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
- super(path, lockFactory);
- this.forcedBufferSize = forcedBufferSize;
- }
-
- @Override
- public IndexInput openInput(String name, IOContext context) throws IOException {
- ensureOpen();
- return new DirectIOLinuxIndexInput(new File(getDirectory(), name),
- bufferSize(context));
- }
-
- @Override
- public IndexOutput createOutput(String name, IOContext context) throws IOException {
- ensureOpen();
- ensureCanWrite(name);
- return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), bufferSize(context));
- }
-
- private int bufferSize(IOContext context) {
- return forcedBufferSize != 0 ? forcedBufferSize : BufferedIndexInput
- .bufferSize(context);
- }
-
- private final static class DirectIOLinuxIndexOutput extends IndexOutput {
- private final ByteBuffer buffer;
- private final FileOutputStream fos;
- private final FileChannel channel;
- private final int bufferSize;
-
- //private final File path;
-
- private int bufferPos;
- private long filePos;
- private long fileLength;
- private boolean isOpen;
-
- public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
- //this.path = path;
- FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
- fos = new FileOutputStream(fd);
- //fos = new FileOutputStream(path);
- channel = fos.getChannel();
- buffer = ByteBuffer.allocateDirect(bufferSize);
- this.bufferSize = bufferSize;
- isOpen = true;
- }
-
- @Override
- public void writeByte(byte b) throws IOException {
- assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
- buffer.put(b);
- if (++bufferPos == bufferSize) {
- dump();
- }
- }
-
- @Override
- public void writeBytes(byte[] src, int offset, int len) throws IOException {
- int toWrite = len;
- while(true) {
- final int left = bufferSize - bufferPos;
- if (left <= toWrite) {
- buffer.put(src, offset, left);
- toWrite -= left;
- offset += left;
- bufferPos = bufferSize;
- dump();
- } else {
- buffer.put(src, offset, toWrite);
- bufferPos += toWrite;
- break;
- }
- }
- }
-
- //@Override
- //public void setLength() throws IOException {
- // TODO -- how to impl this? neither FOS nor
- // FileChannel provides an API?
- //}
-
- @Override
- public void flush() throws IOException {
- // TODO -- I don't think this method is necessary?
- }
-
- private void dump() throws IOException {
- buffer.flip();
- final long limit = filePos + buffer.limit();
- if (limit > fileLength) {
- // this dump extends the file
- fileLength = limit;
- } else {
- // we had seek'd back & wrote some changes
- }
-
- // must always round to next block
- buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
-
- assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
- assert (filePos & ALIGN_NOT_MASK) == filePos;
- //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
- channel.write(buffer, filePos);
- filePos += bufferPos;
- bufferPos = 0;
- buffer.clear();
- //System.out.println("dump: done");
-
- // TODO: the case where we'd seek'd back, wrote an
- // entire buffer, we must here read the next buffer;
- // likely Lucene won't trip on this since we only
- // write smallish amounts on seeking back
- }
-
- @Override
- public long getFilePointer() {
- return filePos + bufferPos;
- }
-
- // TODO: seek is fragile at best; it can only properly
- // handle seek & then change bytes that fit entirely
- // within one buffer
- @Override
- public void seek(long pos) throws IOException {
- if (pos != getFilePointer()) {
- dump();
- final long alignedPos = pos & ALIGN_NOT_MASK;
- filePos = alignedPos;
- int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
- if (n < bufferSize) {
- buffer.limit(n);
- }
- //System.out.println("seek refill=" + n);
- final int delta = (int) (pos - alignedPos);
- buffer.position(delta);
- bufferPos = delta;
- }
- }
-
- @Override
- public long length() throws IOException {
- return fileLength;
- }
-
- @Override
- public void close() throws IOException {
- if (isOpen) {
- isOpen = false;
- try {
- dump();
- } finally {
- try {
- //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
- channel.truncate(fileLength);
- //System.out.println(" now: " + channel.size());
- } finally {
- try {
- channel.close();
- } finally {
- fos.close();
- //System.out.println(" final len=" + path.length());
- }
- }
- }
- }
- }
- }
-
- private final static class DirectIOLinuxIndexInput extends IndexInput {
- private final ByteBuffer buffer;
- private final FileInputStream fis;
- private final FileChannel channel;
- private final int bufferSize;
-
- private boolean isOpen;
- private boolean isClone;
- private long filePos;
- private int bufferPos;
-
- public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
- // TODO make use of IOContext
- super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")");
- FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
- fis = new FileInputStream(fd);
- channel = fis.getChannel();
- this.bufferSize = bufferSize;
- buffer = ByteBuffer.allocateDirect(bufferSize);
- isOpen = true;
- isClone = false;
- filePos = -bufferSize;
- bufferPos = bufferSize;
- //System.out.println("D open " + path + " this=" + this);
- }
-
- // for clone
- public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
- super(other.toString());
- this.fis = null;
- channel = other.channel;
- this.bufferSize = other.bufferSize;
- buffer = ByteBuffer.allocateDirect(bufferSize);
- filePos = -bufferSize;
- bufferPos = bufferSize;
- isOpen = true;
- isClone = true;
- //System.out.println("D clone this=" + this);
- seek(other.getFilePointer());
- }
-
- @Override
- public void close() throws IOException {
- if (isOpen && !isClone) {
- try {
- channel.close();
- } finally {
- if (!isClone) {
- fis.close();
- }
- }
- }
- }
-
- @Override
- public long getFilePointer() {
- return filePos + bufferPos;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- if (pos != getFilePointer()) {
- final long alignedPos = pos & ALIGN_NOT_MASK;
- //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
- filePos = alignedPos-bufferSize;
- refill();
-
- final int delta = (int) (pos - alignedPos);
- buffer.position(delta);
- bufferPos = delta;
- }
- }
-
- @Override
- public long length() {
- try {
- return channel.size();
- } catch (IOException ioe) {
- throw new RuntimeException("IOException during length(): " + this, ioe);
- }
- }
-
- @Override
- public byte readByte() throws IOException {
- // NOTE: we don't guard against EOF here... ie the
- // "final" buffer will typically be filled to less
- // than bufferSize
- if (bufferPos == bufferSize) {
- refill();
- }
- assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
- bufferPos++;
- return buffer.get();
- }
-
- private void refill() throws IOException {
- buffer.clear();
- filePos += bufferSize;
- bufferPos = 0;
- assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
- //System.out.println("X refill filePos=" + filePos);
- int n;
- try {
- n = channel.read(buffer, filePos);
- } catch (IOException ioe) {
- throw new IOException(ioe.getMessage() + ": " + this, ioe);
- }
- if (n < 0) {
- throw new EOFException("read past EOF: " + this);
- }
- buffer.rewind();
- }
-
- @Override
- public void readBytes(byte[] dst, int offset, int len) throws IOException {
- int toRead = len;
- //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
- while(true) {
- final int left = bufferSize - bufferPos;
- if (left < toRead) {
- //System.out.println(" copy " + left);
- buffer.get(dst, offset, left);
- toRead -= left;
- offset += left;
- refill();
- } else {
- //System.out.println(" copy " + toRead);
- buffer.get(dst, offset, toRead);
- bufferPos += toRead;
- //System.out.println(" readBytes done");
- break;
- }
- }
- }
-
- @Override
- public Object clone() {
- try {
- return new DirectIOLinuxIndexInput(this);
- } catch (IOException ioe) {
- throw new RuntimeException("IOException during clone: " + this, ioe);
- }
- }
- }
-}
Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp
===================================================================
--- lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp (revision 1231707)
+++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp (working copy)
@@ -15,6 +15,16 @@
* the License.
*/
+#ifdef LINUX
+ #define DIRECT_FLAG O_DIRECT | O_NOATIME
+ #define LINUX
+#elif __APPLE__
+ #define DIRECT_FLAG 0
+ #define OSX
+#else
+ #define DIRECT_FLAG O_DIRECT // __unix__ is not used as even Linux falls under it.
+#endif
+
#include
NOTE: This uses C++ sources (accessible via JNI), which you'll
have to compile on your platform. Further, this is a very
-platform-specific extensions (runs only on Linux, and likely only on
-2.6.x kernels).
+platform-specific extensions (runs only on Unix based kernels, and likely only on
+2.6.x kernels and above).
-DirectIOLinuxDirectory is a Directory implementation that bypasses the
+NativeUnixDirectory is a Directory implementation that bypasses the
OS's buffer cache for any IndexInput and IndexOutput opened through it
-(using the linux-specific O_DIRECT flag).
+(Uses direct I/O for all merge input and output)
Note that doing so typically results in bad performance loss! You
Index: lucene/contrib/misc/build.xml
===================================================================
--- lucene/contrib/misc/build.xml (revision 1231707)
+++ lucene/contrib/misc/build.xml (working copy)
@@ -40,11 +40,13 @@
DirectIOLinuxDirectory
+NativeUnixDirectory