Index: lucene/contrib/misc/build.xml =================================================================== --- lucene/contrib/misc/build.xml (revision 1147810) +++ lucene/contrib/misc/build.xml (working copy) @@ -40,11 +40,13 @@ + + Index: lucene/contrib/misc/src/java/overview.html =================================================================== --- lucene/contrib/misc/src/java/overview.html (revision 1147810) +++ lucene/contrib/misc/src/java/overview.html (working copy) @@ -27,7 +27,7 @@ The misc package has various tools for splitting/merging indices, changing norms, finding high freq terms, and others. -

DirectIOLinuxDirectory

+

NativeUnixDirectory

NOTE: This uses C++ sources (accessible via JNI), which you'll @@ -36,7 +36,7 @@ 2.6.x kernels).

-DirectIOLinuxDirectory is a Directory implementation that bypasses the +NativeUnixDirectory is a Directory implementation that bypasses the OS's buffer cache for any IndexInput and IndexOutput opened through it (using the linux-specific O_DIRECT flag). Index: lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java (revision 1147810) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/DirectIOLinuxDirectory.java (working copy) @@ -1,372 +0,0 @@ -package org.apache.lucene.store; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -import java.io.File; -import java.io.IOException; -import java.io.FileInputStream; -import java.io.FileDescriptor; -import java.io.FileOutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import org.apache.lucene.store.Directory; // javadoc -import org.apache.lucene.store.NativeFSLockFactory; // javadoc - -/** - * An {@link Directory} implementation that uses the - * Linux-specific O_DIRECT flag to bypass all OS level - * caching. To use this you must compile - * NativePosixUtil.cpp (exposes Linux-specific APIs through - * JNI) for your platform. - * - *

WARNING: this code is very new and quite easily - * could contain horrible bugs. For example, here's one - * known issue: if you use seek in IndexOutput, and then - * write more than one buffer's worth of bytes, then the - * file will be wrong. Lucene does not do this (only writes - * small number of bytes after seek). - - * @lucene.experimental - */ -public class DirectIOLinuxDirectory extends FSDirectory { - - private final static long ALIGN = 512; - private final static long ALIGN_NOT_MASK = ~(ALIGN-1); - - private final int forcedBufferSize; - - /** Create a new NIOFSDirectory for the named location. - * - * @param path the path of the directory - * @param lockFactory the lock factory to use, or null for the default - * ({@link NativeFSLockFactory}); - * @param forcedBufferSize if this is 0, just use Lucene's - * default buffer size; else, force this buffer size. - * For best performance, force the buffer size to - * something fairly large (eg 1 MB), but note that this - * will eat up the JRE's direct buffer storage space - * @throws IOException - */ - public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException { - super(path, lockFactory); - this.forcedBufferSize = forcedBufferSize; - } - - @Override - public IndexInput openInput(String name, IOContext context) throws IOException { - ensureOpen(); - return new DirectIOLinuxIndexInput(new File(getDirectory(), name), - bufferSize(context)); - } - - @Override - public IndexOutput createOutput(String name, IOContext context) throws IOException { - ensureOpen(); - ensureCanWrite(name); - return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), bufferSize(context)); - } - - private int bufferSize(IOContext context) { - return forcedBufferSize != 0 ? forcedBufferSize : BufferedIndexInput - .bufferSize(context); - } - - private final static class DirectIOLinuxIndexOutput extends IndexOutput { - private final ByteBuffer buffer; - private final FileOutputStream fos; - private final FileChannel channel; - private final int bufferSize; - - //private final File path; - - private int bufferPos; - private long filePos; - private long fileLength; - private boolean isOpen; - - public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException { - //this.path = path; - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); - fos = new FileOutputStream(fd); - //fos = new FileOutputStream(path); - channel = fos.getChannel(); - buffer = ByteBuffer.allocateDirect(bufferSize); - this.bufferSize = bufferSize; - isOpen = true; - } - - @Override - public void writeByte(byte b) throws IOException { - assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position(); - buffer.put(b); - if (++bufferPos == bufferSize) { - dump(); - } - } - - @Override - public void writeBytes(byte[] src, int offset, int len) throws IOException { - int toWrite = len; - while(true) { - final int left = bufferSize - bufferPos; - if (left <= toWrite) { - buffer.put(src, offset, left); - toWrite -= left; - offset += left; - bufferPos = bufferSize; - dump(); - } else { - buffer.put(src, offset, toWrite); - bufferPos += toWrite; - break; - } - } - } - - //@Override - //public void setLength() throws IOException { - // TODO -- how to impl this? neither FOS nor - // FileChannel provides an API? - //} - - @Override - public void flush() throws IOException { - // TODO -- I don't think this method is necessary? - } - - private void dump() throws IOException { - buffer.flip(); - final long limit = filePos + buffer.limit(); - if (limit > fileLength) { - // this dump extends the file - fileLength = limit; - } else { - // we had seek'd back & wrote some changes - } - - // must always round to next block - buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK)); - - assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK); - assert (filePos & ALIGN_NOT_MASK) == filePos; - //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos); - channel.write(buffer, filePos); - filePos += bufferPos; - bufferPos = 0; - buffer.clear(); - //System.out.println("dump: done"); - - // TODO: the case where we'd seek'd back, wrote an - // entire buffer, we must here read the next buffer; - // likely Lucene won't trip on this since we only - // write smallish amounts on seeking back - } - - @Override - public long getFilePointer() { - return filePos + bufferPos; - } - - // TODO: seek is fragile at best; it can only properly - // handle seek & then change bytes that fit entirely - // within one buffer - @Override - public void seek(long pos) throws IOException { - if (pos != getFilePointer()) { - dump(); - final long alignedPos = pos & ALIGN_NOT_MASK; - filePos = alignedPos; - int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer); - if (n < bufferSize) { - buffer.limit(n); - } - //System.out.println("seek refill=" + n); - final int delta = (int) (pos - alignedPos); - buffer.position(delta); - bufferPos = delta; - } - } - - @Override - public long length() throws IOException { - return fileLength; - } - - @Override - public void close() throws IOException { - if (isOpen) { - isOpen = false; - try { - dump(); - } finally { - try { - //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path); - channel.truncate(fileLength); - //System.out.println(" now: " + channel.size()); - } finally { - try { - channel.close(); - } finally { - fos.close(); - //System.out.println(" final len=" + path.length()); - } - } - } - } - } - } - - private final static class DirectIOLinuxIndexInput extends IndexInput { - private final ByteBuffer buffer; - private final FileInputStream fis; - private final FileChannel channel; - private final int bufferSize; - - private boolean isOpen; - private boolean isClone; - private long filePos; - private int bufferPos; - - public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException { - // TODO make use of IOContext - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); - fis = new FileInputStream(fd); - channel = fis.getChannel(); - this.bufferSize = bufferSize; - buffer = ByteBuffer.allocateDirect(bufferSize); - isOpen = true; - isClone = false; - filePos = -bufferSize; - bufferPos = bufferSize; - //System.out.println("D open " + path + " this=" + this); - } - - // for clone - public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException { - this.fis = null; - channel = other.channel; - this.bufferSize = other.bufferSize; - buffer = ByteBuffer.allocateDirect(bufferSize); - filePos = -bufferSize; - bufferPos = bufferSize; - isOpen = true; - isClone = true; - //System.out.println("D clone this=" + this); - seek(other.getFilePointer()); - } - - @Override - public void close() throws IOException { - if (isOpen && !isClone) { - try { - channel.close(); - } finally { - if (!isClone) { - fis.close(); - } - } - } - } - - @Override - public long getFilePointer() { - return filePos + bufferPos; - } - - @Override - public void seek(long pos) throws IOException { - if (pos != getFilePointer()) { - final long alignedPos = pos & ALIGN_NOT_MASK; - //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this); - filePos = alignedPos-bufferSize; - refill(); - - final int delta = (int) (pos - alignedPos); - buffer.position(delta); - bufferPos = delta; - } - } - - @Override - public long length() { - try { - return channel.size(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - @Override - public byte readByte() throws IOException { - // NOTE: we don't guard against EOF here... ie the - // "final" buffer will typically be filled to less - // than bufferSize - if (bufferPos == bufferSize) { - refill(); - } - assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position(); - bufferPos++; - return buffer.get(); - } - - private void refill() throws IOException { - buffer.clear(); - filePos += bufferSize; - bufferPos = 0; - assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK); - //System.out.println("X refill filePos=" + filePos); - int n = channel.read(buffer, filePos); - if (n < 0) { - throw new IOException("eof"); - } - buffer.rewind(); - } - - @Override - public void readBytes(byte[] dst, int offset, int len) throws IOException { - int toRead = len; - //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this); - while(true) { - final int left = bufferSize - bufferPos; - if (left < toRead) { - //System.out.println(" copy " + left); - buffer.get(dst, offset, left); - toRead -= left; - offset += left; - refill(); - } else { - //System.out.println(" copy " + toRead); - buffer.get(dst, offset, toRead); - bufferPos += toRead; - //System.out.println(" readBytes done"); - break; - } - } - } - - @Override - public Object clone() { - try { - return new DirectIOLinuxIndexInput(this); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - } -} Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp (revision 1147810) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp (working copy) @@ -15,6 +15,19 @@ * the License. */ +#if linux + #define DIRECT_FLAG O_DIRECT | O_NOATIME + #define NORMAL_FLAG O_NOATIME + #define LINUX "linux" +#elif __APPLE__ + #define DIRECT_FLAG 0 + #define NORMAL_FLAG 0 + #define OSX "apple" +#else + #define DIRECT_FLAG O_DIRECT // __unix__ is not used as even Linux falls under it. + #define NORMAL_FLAG 0 +#endif + #include #include // posix_fadvise, constants for open #include // strerror @@ -26,6 +39,7 @@ // java -cp .:lib/junit-4.7.jar:./build/classes/test:./build/classes/java:./build/classes/demo -Dlucene.version=2.9-dev -DtempDir=build -ea org.junit.runner.JUnitCore org.apache.lucene.index.TestDoc +#ifdef LINUX /* * Class: org_apache_lucene_store_NativePosixUtil * Method: posix_fadvise @@ -89,6 +103,7 @@ return 0; } +#endif /* @@ -107,16 +122,87 @@ char *fname; class_ioex = env->FindClass("java/io/IOException"); - if (class_ioex == NULL) return NULL; + if (class_ioex == NULL) { + return NULL; + } + class_fdesc = env->FindClass("java/io/FileDescriptor"); + if (class_fdesc == NULL) { + return NULL; + } + + fname = (char *) env->GetStringUTFChars(filename, NULL); + + if (readOnly) { + fd = open(fname, O_RDONLY | DIRECT_FLAG); + #ifdef OSX + fcntl(fd, F_NOCACHE, 1); + #endif + } else { + fd = open(fname, O_RDWR | O_CREAT | DIRECT_FLAG, 0666); + #ifdef OSX + fcntl(fd, F_NOCACHE, 1); + #endif + } + + //printf("open %s -> %d; ro %d\n", fname, fd, readOnly); fflush(stdout); + + env->ReleaseStringUTFChars(filename, fname); + + if (fd < 0) { + // open returned an error. Throw an IOException with the error string + env->ThrowNew(class_ioex, strerror(errno)); + return NULL; + } + + // construct a new FileDescriptor + const_fdesc = env->GetMethodID(class_fdesc, "", "()V"); + if (const_fdesc == NULL) { + return NULL; + } + ret = env->NewObject(class_fdesc, const_fdesc); + + // poke the "fd" field with the file descriptor + field_fd = env->GetFieldID(class_fdesc, "fd", "I"); + if (field_fd == NULL) { + return NULL; + } + env->SetIntField(ret, field_fd, fd); + + // and return it + return ret; +} + + +/* + * Class: org_apache_lucene_store_NativePosixUtil + * Method: open_normal + * Signature: (Ljava/lang/String;Z)Ljava/io/FileDescriptor; + */ +extern "C" +JNIEXPORT jobject JNICALL Java_org_apache_lucene_store_NativePosixUtil_open_1normal(JNIEnv *env, jclass _ignore, jstring filename, jboolean readOnly) +{ + jfieldID field_fd; + jmethodID const_fdesc; + jclass class_fdesc, class_ioex; + jobject ret; + int fd; + char *fname; + + class_ioex = env->FindClass("java/io/IOException"); + if (class_ioex == NULL) { + return NULL; + } class_fdesc = env->FindClass("java/io/FileDescriptor"); - if (class_fdesc == NULL) return NULL; + if (class_fdesc == NULL) { + return NULL; + } fname = (char *) env->GetStringUTFChars(filename, NULL); if (readOnly) { - fd = open(fname, O_RDONLY | O_DIRECT | O_NOATIME); + fd = open(fname, O_RDONLY | NORMAL_FLAG); } else { - fd = open(fname, O_RDWR | O_CREAT | O_DIRECT | O_NOATIME, 0666); + fd = open(fname, O_RDWR | O_CREAT | NORMAL_FLAG, 0666); } //printf("open %s -> %d; ro %d\n", fname, fd, readOnly); fflush(stdout); @@ -131,12 +217,16 @@ // construct a new FileDescriptor const_fdesc = env->GetMethodID(class_fdesc, "", "()V"); - if (const_fdesc == NULL) return NULL; + if (const_fdesc == NULL) { + return NULL; + } ret = env->NewObject(class_fdesc, const_fdesc); // poke the "fd" field with the file descriptor field_fd = env->GetFieldID(class_fdesc, "fd", "I"); - if (field_fd == NULL) return NULL; + if (field_fd == NULL) { + return NULL; + } env->SetIntField(ret, field_fd, fd); // and return it Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.java (revision 1147810) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativePosixUtil.java (working copy) @@ -37,6 +37,7 @@ public static native int posix_madvise(ByteBuffer buf, int advise) throws IOException; public static native int madvise(ByteBuffer buf, int advise) throws IOException; public static native FileDescriptor open_direct(String filename, boolean read) throws IOException; + public static native FileDescriptor open_normal(String filename, boolean read) throws IOException; public static native long pread(FileDescriptor fd, long pos, ByteBuffer byteBuf) throws IOException; public static void advise(FileDescriptor fd, long offset, long len, int advise) throws IOException { @@ -46,4 +47,4 @@ } } } - + \ No newline at end of file Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java =================================================================== --- lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java (revision 1147810) +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NativeUnixDirectory.java (working copy) @@ -26,6 +26,7 @@ import java.nio.channels.FileChannel; import org.apache.lucene.store.Directory; // javadoc +import org.apache.lucene.store.IOContext.Context; import org.apache.lucene.store.NativeFSLockFactory; // javadoc /** @@ -44,7 +45,7 @@ * @lucene.experimental */ -public class DirectIOLinuxDirectory extends FSDirectory { +public class NativeUnixDirectory extends FSDirectory { private final static long ALIGN = 512; private final static long ALIGN_NOT_MASK = ~(ALIGN-1); @@ -63,7 +64,7 @@ * will eat up the JRE's direct buffer storage space * @throws IOException */ - public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException { + public NativeUnixDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException { super(path, lockFactory); this.forcedBufferSize = forcedBufferSize; } @@ -71,15 +72,15 @@ @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); - return new DirectIOLinuxIndexInput(new File(getDirectory(), name), - bufferSize(context)); + return new NativeUnixIndexInput(new File(getDirectory(), name), + bufferSize(context), context); } @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); ensureCanWrite(name); - return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), bufferSize(context)); + return new NativeUnixndexOutput(new File(getDirectory(), name), bufferSize(context)); } private int bufferSize(IOContext context) { @@ -87,7 +88,7 @@ .bufferSize(context); } - private final static class DirectIOLinuxIndexOutput extends IndexOutput { + private final static class NativeUnixndexOutput extends IndexOutput { private final ByteBuffer buffer; private final FileOutputStream fos; private final FileChannel channel; @@ -100,7 +101,7 @@ private long fileLength; private boolean isOpen; - public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException { + public NativeUnixndexOutput(File path, int bufferSize) throws IOException { //this.path = path; FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); fos = new FileOutputStream(fd); @@ -232,7 +233,7 @@ } } - private final static class DirectIOLinuxIndexInput extends IndexInput { + private final static class NativeUnixIndexInput extends IndexInput { private final ByteBuffer buffer; private final FileInputStream fis; private final FileChannel channel; @@ -243,9 +244,15 @@ private long filePos; private int bufferPos; - public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException { - // TODO make use of IOContext - FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); + public NativeUnixIndexInput(File path, int bufferSize, IOContext context) throws IOException { + FileDescriptor fd; + if (context.context == Context.MERGE) { + fd = NativePosixUtil.open_direct(path.toString(), true); + } + else { + fd = NativePosixUtil.open_normal(path.toString(), true); + } + fis = new FileInputStream(fd); channel = fis.getChannel(); this.bufferSize = bufferSize; @@ -258,7 +265,7 @@ } // for clone - public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException { + public NativeUnixIndexInput(NativeUnixIndexInput other) throws IOException { this.fis = null; channel = other.channel; this.bufferSize = other.bufferSize; @@ -363,7 +370,7 @@ @Override public Object clone() { try { - return new DirectIOLinuxIndexInput(this); + return new NativeUnixIndexInput(this); } catch (IOException ioe) { throw new RuntimeException(ioe); }