Index: lucene/contrib/CHANGES.txt =================================================================== --- lucene/contrib/CHANGES.txt (revision 1231707) +++ lucene/contrib/CHANGES.txt (working copy) @@ -58,6 +58,14 @@ way as DirectSpellChecker. This can be used to merge top-N results from more than one SpellChecker. (James Dyer via Robert Muir) + * LUCENE-2795: Generified DirectIOLinuxDirectory to work across any + unix supporting the O_DIRECT flag when opening a file (tested on + Linux and OS X but likely other Unixes will work), and improved it + so it can be used for indexing and searching. The directory uses + direct IO when doing large merges to avoid unnecessarily evicting + cached IO pages due to large merges. (Varun Thacker, Mike + McCandless) + API Changes * LUCENE-2606: Changed RegexCapabilities interface to fix thread Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java (working copy) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java (working copy) @@ -27,68 +27,116 @@ import java.nio.channels.FileChannel; import org.apache.lucene.store.Directory; // javadoc -import org.apache.lucene.store.NativeFSLockFactory; // javadoc +import org.apache.lucene.store.IOContext.Context; +// TODO +// - newer Linux kernel versions (after 2.6.29) have +// improved MADV_SEQUENTIAL (and hopefully also +// FADV_SEQUENTIAL) interaction with the buffer +// cache; we should explore using that instead of direct +// IO when context is merge + /** - * An {@link Directory} implementation that uses the - * Linux-specific O_DIRECT flag to bypass all OS level - * caching. To use this you must compile + * A {@link Directory} implementation for all Unixes that uses + * DIRECT I/O to bypass OS level IO caching during + * merging. For all other cases (searching, writing) we delegate + * to the provided Directory instance. + * + *

To use this you must compile * NativePosixUtil.cpp (exposes Linux-specific APIs through - * JNI) for your platform. + * JNI) for your platform, by running ant + * build-native-unix, and then putting the resulting + * libNativePosixUtil.so (from + * lucene/build/native) onto your dynamic + * linker search path. * *

WARNING: this code is very new and quite easily * could contain horrible bugs. For example, here's one - * known issue: if you use seek in IndexOutput, and then + * known issue: if you use seek in IndexOutput, and then * write more than one buffer's worth of bytes, then the - * file will be wrong. Lucene does not do this (only writes - * small number of bytes after seek). - + * file will be wrong. Lucene does not do this today (only writes + * small number of bytes after seek), but that may change. + * + *

This directory passes Solr and Lucene tests on Linux + * and OS X; other Unixes should work but have not been + * tested! Use at your own risk. + * * @lucene.experimental */ -public class DirectIOLinuxDirectory extends FSDirectory { +public class NativeUnixDirectory extends FSDirectory { + // TODO: this is OS dependent, but likely 512 is the LCD private final static long ALIGN = 512; private final static long ALIGN_NOT_MASK = ~(ALIGN-1); + + /** Default buffer size before writing to disk (256 MB); + * larger means less IO load but more RAM and direct + * buffer storage space consumed during merging. */ - private final int forcedBufferSize; + public final static int DEFAULT_MERGE_BUFFER_SIZE = 262144; + /** Default min expected merge size before direct IO is + * used (10 MB): */ + public final static long DEFAULT_MIN_BYTES_DIRECT = 10*1024*1024; + + private final int mergeBufferSize; + private final long minBytesDirect; + private final Directory delegate; + /** Create a new NIOFSDirectory for the named location. * * @param path the path of the directory - * @param lockFactory the lock factory to use, or null for the default - * ({@link NativeFSLockFactory}); - * @param forcedBufferSize if this is 0, just use Lucene's - * default buffer size; else, force this buffer size. - * For best performance, force the buffer size to - * something fairly large (eg 1 MB), but note that this - * will eat up the JRE's direct buffer storage space + * @param mergeBufferSize Size of buffer to use for + * merging. See {@link #DEFAULT_MERGE_BUFFER_SIZE}. + * @param minBytesDirect Merges, or files to be opened for + * reading, smaller than this will + * not use direct IO. See {@link + * #DEFAULT_MIN_MERGE_BYTES_DIRECT} + * @param delegate fallback Directory for non-merges * @throws IOException */ - public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException { - super(path, lockFactory); - this.forcedBufferSize = forcedBufferSize; + public NativeUnixDirectory(File path, int mergeBufferSize, long minBytesDirect, Directory delegate) throws IOException { + super(path, delegate.getLockFactory()); + if ((mergeBufferSize & ALIGN) != 0) { + throw new IllegalArgumentException("mergeBufferSize must be 0 mod " + ALIGN + " (got: " + mergeBufferSize + ")"); + } + this.mergeBufferSize = mergeBufferSize; + this.minBytesDirect = minBytesDirect; + this.delegate = delegate; } + + /** Create a new NIOFSDirectory for the named location. + * + * @param path the path of the directory + * @param delegate fallback Directory for non-merges + * @throws IOException + */ + public NativeUnixDirectory(File path, Directory delegate) throws IOException { + this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, delegate); + } @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); - return new DirectIOLinuxIndexInput(new File(getDirectory(), name), - bufferSize(context)); + if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect || fileLength(name) < minBytesDirect) { + return delegate.openInput(name, context); + } else { + return new NativeUnixIndexInput(new File(getDirectory(), name), mergeBufferSize); + } } @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); - ensureCanWrite(name); - return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), bufferSize(context)); + if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect) { + return delegate.createOutput(name, context); + } else { + ensureCanWrite(name); + return new NativeUnixIndexOutput(new File(getDirectory(), name), mergeBufferSize); + } } - - private int bufferSize(IOContext context) { - return forcedBufferSize != 0 ? forcedBufferSize : BufferedIndexInput - .bufferSize(context); - } - private final static class DirectIOLinuxIndexOutput extends IndexOutput { + private final static class NativeUnixIndexOutput extends IndexOutput { private final ByteBuffer buffer; private final FileOutputStream fos; private final FileChannel channel; @@ -101,9 +149,9 @@ private long fileLength; private boolean isOpen; - public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException { + public NativeUnixIndexOutput(File path, int bufferSize) throws IOException { //this.path = path; - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); + final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); fos = new FileOutputStream(fd); //fos = new FileOutputStream(path); channel = fos.getChannel(); @@ -206,7 +254,7 @@ @Override public long length() throws IOException { - return fileLength; + return fileLength + bufferPos; } @Override @@ -233,7 +281,7 @@ } } - private final static class DirectIOLinuxIndexInput extends IndexInput { + private final static class NativeUnixIndexInput extends IndexInput { private final ByteBuffer buffer; private final FileInputStream fis; private final FileChannel channel; @@ -244,10 +292,9 @@ private long filePos; private int bufferPos; - public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException { - // TODO make use of IOContext - super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")"); - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); + public NativeUnixIndexInput(File path, int bufferSize) throws IOException { + super("NativeUnixIndexInput(path=\"" + path.getPath() + "\")"); + final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); fis = new FileInputStream(fd); channel = fis.getChannel(); this.bufferSize = bufferSize; @@ -260,7 +307,7 @@ } // for clone - public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException { + public NativeUnixIndexInput(NativeUnixIndexInput other) throws IOException { super(other.toString()); this.fis = null; channel = other.channel; @@ -296,13 +343,17 @@ public void seek(long pos) throws IOException { if (pos != getFilePointer()) { final long alignedPos = pos & ALIGN_NOT_MASK; - //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this); filePos = alignedPos-bufferSize; - refill(); final int delta = (int) (pos - alignedPos); - buffer.position(delta); - bufferPos = delta; + if (delta != 0) { + refill(); + buffer.position(delta); + bufferPos = delta; + } else { + // force refill on next read + bufferPos = bufferSize; + } } } @@ -371,7 +422,7 @@ @Override public Object clone() { try { - return new DirectIOLinuxIndexInput(this); + return new NativeUnixIndexInput(this); } catch (IOException ioe) { throw new RuntimeException("IOException during clone: " + this, ioe); } Index: lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java (revision 1231707) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java (working copy) @@ -1,380 +0,0 @@ -package org.apache.lucene.store; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.FileInputStream; -import java.io.FileDescriptor; -import java.io.FileOutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import org.apache.lucene.store.Directory; // javadoc -import org.apache.lucene.store.NativeFSLockFactory; // javadoc - -/** - * An {@link Directory} implementation that uses the - * Linux-specific O_DIRECT flag to bypass all OS level - * caching. To use this you must compile - * NativePosixUtil.cpp (exposes Linux-specific APIs through - * JNI) for your platform. - * - *

WARNING: this code is very new and quite easily - * could contain horrible bugs. For example, here's one - * known issue: if you use seek in IndexOutput, and then - * write more than one buffer's worth of bytes, then the - * file will be wrong. Lucene does not do this (only writes - * small number of bytes after seek). - - * @lucene.experimental - */ -public class DirectIOLinuxDirectory extends FSDirectory { - - private final static long ALIGN = 512; - private final static long ALIGN_NOT_MASK = ~(ALIGN-1); - - private final int forcedBufferSize; - - /** Create a new NIOFSDirectory for the named location. - * - * @param path the path of the directory - * @param lockFactory the lock factory to use, or null for the default - * ({@link NativeFSLockFactory}); - * @param forcedBufferSize if this is 0, just use Lucene's - * default buffer size; else, force this buffer size. - * For best performance, force the buffer size to - * something fairly large (eg 1 MB), but note that this - * will eat up the JRE's direct buffer storage space - * @throws IOException - */ - public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException { - super(path, lockFactory); - this.forcedBufferSize = forcedBufferSize; - } - - @Override - public IndexInput openInput(String name, IOContext context) throws IOException { - ensureOpen(); - return new DirectIOLinuxIndexInput(new File(getDirectory(), name), - bufferSize(context)); - } - - @Override - public IndexOutput createOutput(String name, IOContext context) throws IOException { - ensureOpen(); - ensureCanWrite(name); - return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), bufferSize(context)); - } - - private int bufferSize(IOContext context) { - return forcedBufferSize != 0 ? forcedBufferSize : BufferedIndexInput - .bufferSize(context); - } - - private final static class DirectIOLinuxIndexOutput extends IndexOutput { - private final ByteBuffer buffer; - private final FileOutputStream fos; - private final FileChannel channel; - private final int bufferSize; - - //private final File path; - - private int bufferPos; - private long filePos; - private long fileLength; - private boolean isOpen; - - public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException { - //this.path = path; - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); - fos = new FileOutputStream(fd); - //fos = new FileOutputStream(path); - channel = fos.getChannel(); - buffer = ByteBuffer.allocateDirect(bufferSize); - this.bufferSize = bufferSize; - isOpen = true; - } - - @Override - public void writeByte(byte b) throws IOException { - assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position(); - buffer.put(b); - if (++bufferPos == bufferSize) { - dump(); - } - } - - @Override - public void writeBytes(byte[] src, int offset, int len) throws IOException { - int toWrite = len; - while(true) { - final int left = bufferSize - bufferPos; - if (left <= toWrite) { - buffer.put(src, offset, left); - toWrite -= left; - offset += left; - bufferPos = bufferSize; - dump(); - } else { - buffer.put(src, offset, toWrite); - bufferPos += toWrite; - break; - } - } - } - - //@Override - //public void setLength() throws IOException { - // TODO -- how to impl this? neither FOS nor - // FileChannel provides an API? - //} - - @Override - public void flush() throws IOException { - // TODO -- I don't think this method is necessary? - } - - private void dump() throws IOException { - buffer.flip(); - final long limit = filePos + buffer.limit(); - if (limit > fileLength) { - // this dump extends the file - fileLength = limit; - } else { - // we had seek'd back & wrote some changes - } - - // must always round to next block - buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK)); - - assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK); - assert (filePos & ALIGN_NOT_MASK) == filePos; - //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos); - channel.write(buffer, filePos); - filePos += bufferPos; - bufferPos = 0; - buffer.clear(); - //System.out.println("dump: done"); - - // TODO: the case where we'd seek'd back, wrote an - // entire buffer, we must here read the next buffer; - // likely Lucene won't trip on this since we only - // write smallish amounts on seeking back - } - - @Override - public long getFilePointer() { - return filePos + bufferPos; - } - - // TODO: seek is fragile at best; it can only properly - // handle seek & then change bytes that fit entirely - // within one buffer - @Override - public void seek(long pos) throws IOException { - if (pos != getFilePointer()) { - dump(); - final long alignedPos = pos & ALIGN_NOT_MASK; - filePos = alignedPos; - int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer); - if (n < bufferSize) { - buffer.limit(n); - } - //System.out.println("seek refill=" + n); - final int delta = (int) (pos - alignedPos); - buffer.position(delta); - bufferPos = delta; - } - } - - @Override - public long length() throws IOException { - return fileLength; - } - - @Override - public void close() throws IOException { - if (isOpen) { - isOpen = false; - try { - dump(); - } finally { - try { - //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path); - channel.truncate(fileLength); - //System.out.println(" now: " + channel.size()); - } finally { - try { - channel.close(); - } finally { - fos.close(); - //System.out.println(" final len=" + path.length()); - } - } - } - } - } - } - - private final static class DirectIOLinuxIndexInput extends IndexInput { - private final ByteBuffer buffer; - private final FileInputStream fis; - private final FileChannel channel; - private final int bufferSize; - - private boolean isOpen; - private boolean isClone; - private long filePos; - private int bufferPos; - - public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException { - // TODO make use of IOContext - super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")"); - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); - fis = new FileInputStream(fd); - channel = fis.getChannel(); - this.bufferSize = bufferSize; - buffer = ByteBuffer.allocateDirect(bufferSize); - isOpen = true; - isClone = false; - filePos = -bufferSize; - bufferPos = bufferSize; - //System.out.println("D open " + path + " this=" + this); - } - - // for clone - public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException { - super(other.toString()); - this.fis = null; - channel = other.channel; - this.bufferSize = other.bufferSize; - buffer = ByteBuffer.allocateDirect(bufferSize); - filePos = -bufferSize; - bufferPos = bufferSize; - isOpen = true; - isClone = true; - //System.out.println("D clone this=" + this); - seek(other.getFilePointer()); - } - - @Override - public void close() throws IOException { - if (isOpen && !isClone) { - try { - channel.close(); - } finally { - if (!isClone) { - fis.close(); - } - } - } - } - - @Override - public long getFilePointer() { - return filePos + bufferPos; - } - - @Override - public void seek(long pos) throws IOException { - if (pos != getFilePointer()) { - final long alignedPos = pos & ALIGN_NOT_MASK; - //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this); - filePos = alignedPos-bufferSize; - refill(); - - final int delta = (int) (pos - alignedPos); - buffer.position(delta); - bufferPos = delta; - } - } - - @Override - public long length() { - try { - return channel.size(); - } catch (IOException ioe) { - throw new RuntimeException("IOException during length(): " + this, ioe); - } - } - - @Override - public byte readByte() throws IOException { - // NOTE: we don't guard against EOF here... ie the - // "final" buffer will typically be filled to less - // than bufferSize - if (bufferPos == bufferSize) { - refill(); - } - assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position(); - bufferPos++; - return buffer.get(); - } - - private void refill() throws IOException { - buffer.clear(); - filePos += bufferSize; - bufferPos = 0; - assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK); - //System.out.println("X refill filePos=" + filePos); - int n; - try { - n = channel.read(buffer, filePos); - } catch (IOException ioe) { - throw new IOException(ioe.getMessage() + ": " + this, ioe); - } - if (n < 0) { - throw new EOFException("read past EOF: " + this); - } - buffer.rewind(); - } - - @Override - public void readBytes(byte[] dst, int offset, int len) throws IOException { - int toRead = len; - //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this); - while(true) { - final int left = bufferSize - bufferPos; - if (left < toRead) { - //System.out.println(" copy " + left); - buffer.get(dst, offset, left); - toRead -= left; - offset += left; - refill(); - } else { - //System.out.println(" copy " + toRead); - buffer.get(dst, offset, toRead); - bufferPos += toRead; - //System.out.println(" readBytes done"); - break; - } - } - } - - @Override - public Object clone() { - try { - return new DirectIOLinuxIndexInput(this); - } catch (IOException ioe) { - throw new RuntimeException("IOException during clone: " + this, ioe); - } - } - } -} Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp (revision 1231707) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp (working copy) @@ -15,6 +15,16 @@ * the License. */ +#ifdef LINUX + #define DIRECT_FLAG O_DIRECT | O_NOATIME + #define LINUX +#elif __APPLE__ + #define DIRECT_FLAG 0 + #define OSX +#else + #define DIRECT_FLAG O_DIRECT // __unix__ is not used as even Linux falls under it. +#endif + #include #include // posix_fadvise, constants for open #include // strerror @@ -26,6 +36,7 @@ // java -cp .:lib/junit-4.7.jar:./build/classes/test:./build/classes/java:./build/classes/demo -Dlucene.version=2.9-dev -DtempDir=build -ea org.junit.runner.JUnitCore org.apache.lucene.index.TestDoc +#ifdef LINUX /* * Class: org_apache_lucene_store_NativePosixUtil * Method: posix_fadvise @@ -89,8 +100,8 @@ return 0; } +#endif - /* * Class: org_apache_lucene_store_NativePosixUtil * Method: open_direct @@ -107,16 +118,26 @@ char *fname; class_ioex = env->FindClass("java/io/IOException"); - if (class_ioex == NULL) return NULL; + if (class_ioex == NULL) { + return NULL; + } class_fdesc = env->FindClass("java/io/FileDescriptor"); - if (class_fdesc == NULL) return NULL; + if (class_fdesc == NULL) { + return NULL; + } fname = (char *) env->GetStringUTFChars(filename, NULL); if (readOnly) { - fd = open(fname, O_RDONLY | O_DIRECT | O_NOATIME); + fd = open(fname, O_RDONLY | DIRECT_FLAG); + #ifdef OSX + fcntl(fd, F_NOCACHE, 1); + #endif } else { - fd = open(fname, O_RDWR | O_CREAT | O_DIRECT | O_NOATIME, 0666); + fd = open(fname, O_RDWR | O_CREAT | DIRECT_FLAG, 0666); + #ifdef OSX + fcntl(fd, F_NOCACHE, 1); + #endif } //printf("open %s -> %d; ro %d\n", fname, fd, readOnly); fflush(stdout); @@ -131,19 +152,22 @@ // construct a new FileDescriptor const_fdesc = env->GetMethodID(class_fdesc, "", "()V"); - if (const_fdesc == NULL) return NULL; + if (const_fdesc == NULL) { + return NULL; + } ret = env->NewObject(class_fdesc, const_fdesc); // poke the "fd" field with the file descriptor field_fd = env->GetFieldID(class_fdesc, "fd", "I"); - if (field_fd == NULL) return NULL; + if (field_fd == NULL) { + return NULL; + } env->SetIntField(ret, field_fd, fd); // and return it return ret; } - /* * Class: org_apache_lucene_store_NativePosixUtil * Method: pread Index: lucene/contrib/misc/src/java/overview.html =================================================================== --- lucene/contrib/misc/src/java/overview.html (revision 1231707) +++ lucene/contrib/misc/src/java/overview.html (working copy) @@ -27,18 +27,18 @@ The misc package has various tools for splitting/merging indices, changing norms, finding high freq terms, and others. -

DirectIOLinuxDirectory

+

NativeUnixDirectory

NOTE: This uses C++ sources (accessible via JNI), which you'll have to compile on your platform. Further, this is a very -platform-specific extensions (runs only on Linux, and likely only on -2.6.x kernels). +platform-specific extensions (runs only on Unix based kernels, and likely only on +2.6.x kernels and above).

-DirectIOLinuxDirectory is a Directory implementation that bypasses the +NativeUnixDirectory is a Directory implementation that bypasses the OS's buffer cache for any IndexInput and IndexOutput opened through it -(using the linux-specific O_DIRECT flag). +(Uses direct I/O for all merge input and output)

Note that doing so typically results in bad performance loss! You Index: lucene/contrib/misc/build.xml =================================================================== --- lucene/contrib/misc/build.xml (revision 1231707) +++ lucene/contrib/misc/build.xml (working copy) @@ -40,11 +40,13 @@ + +