Index: src/java/org/apache/lucene/store/ByteBufferIndexInput.java
===================================================================
--- src/java/org/apache/lucene/store/ByteBufferIndexInput.java Wed Mar 03 00:42:35 IST 2010
+++ src/java/org/apache/lucene/store/ByteBufferIndexInput.java Wed Mar 03 00:42:35 IST 2010
@@ -0,0 +1,110 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class ByteBufferIndexInput extends IndexInput {
+
+ private final ByteBufferFile file;
+ private final int bufferSize;
+ private final long length;
+
+ private ByteBuffer currentBuffer;
+ private int currentBufferIndex;
+
+ private long bufferStart;
+
+
+ public ByteBufferIndexInput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException {
+ this.file = file;
+ this.bufferSize = dir.bufferSizeInBytes();
+ this.length = file.length();
+ switchCurrentBuffer(true);
+ }
+
+ @Override public byte readByte() throws IOException {
+ if (!currentBuffer.hasRemaining()) {
+ currentBufferIndex++;
+ switchCurrentBuffer(true);
+ }
+ return currentBuffer.get();
+ }
+
+ @Override public void readBytes(byte[] b, int offset, int len) throws IOException {
+ while (len > 0) {
+ if (!currentBuffer.hasRemaining()) {
+ currentBufferIndex++;
+ switchCurrentBuffer(true);
+ }
+
+ int remainInBuffer = currentBuffer.remaining();
+ int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
+ currentBuffer.get(b, offset, bytesToCopy);
+ offset += bytesToCopy;
+ len -= bytesToCopy;
+ }
+ }
+
+ @Override public void close() throws IOException {
+ }
+
+ @Override public long getFilePointer() {
+ return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
+ }
+
+ @Override public void seek(long pos) throws IOException {
+ if (currentBuffer == null || pos < bufferStart || pos >= bufferStart + bufferSize) {
+ currentBufferIndex = (int) (pos / bufferSize);
+ switchCurrentBuffer(false);
+ }
+ currentBuffer.position((int) (pos % bufferSize));
+ }
+
+ @Override public long length() {
+ return length;
+ }
+
+ private void switchCurrentBuffer(boolean enforceEOF) throws IOException {
+ if (currentBufferIndex >= file.numberOfBuffers()) {
+ // end of file reached, no more buffers left
+ if (enforceEOF)
+ throw new IOException("Read past EOF");
+ else {
+ // Force EOF if a read takes place at this position
+ currentBufferIndex--;
+ currentBuffer.position(bufferSize);
+ }
+ } else {
+ // we must duplicate (and make it read only while we are at it) since we need position and such to be independant
+ currentBuffer = file.buffer(currentBufferIndex).asReadOnlyBuffer();
+ currentBuffer.position(0);
+ bufferStart = (long) bufferSize * (long) currentBufferIndex;
+ }
+ }
+
+ @Override public Object clone() {
+ ByteBufferIndexInput cloned = (ByteBufferIndexInput) super.clone();
+ cloned.currentBuffer = currentBuffer.asReadOnlyBuffer();
+ return cloned;
+ }
+}
Index: src/java/org/apache/lucene/store/ByteBufferDirectory.java
===================================================================
--- src/java/org/apache/lucene/store/ByteBufferDirectory.java Wed Mar 03 01:50:00 IST 2010
+++ src/java/org/apache/lucene/store/ByteBufferDirectory.java Wed Mar 03 01:50:00 IST 2010
@@ -0,0 +1,256 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A memory based directory that uses {@link java.nio.ByteBuffer} in order to store the directory content.
+ *
+ *
The benefit of using {@link java.nio.ByteBuffer} is the fact that it can be stored in "native" memory
+ * outside of the JVM heap, thus not incurring the GC overhead of large in memory index.
+ *
+ *
Each "file" is segmented into one or more byte buffers.
+ *
+ *
Since its good practice to cache byte buffers, it also provide a simple mechanism to define a cache
+ * of byte buffers that are reused when possible.
+ *
+ * @author kimchy (shay.banon)
+ */
+public class ByteBufferDirectory extends Directory {
+
+ private final Map files = new ConcurrentHashMap();
+
+ private final Queue cache;
+
+ private final int bufferSizeInBytes;
+
+ private final int cacheSizeInBytes;
+
+ private final boolean disableCache;
+
+ private final boolean direct;
+
+ private boolean useCleanHack = true;
+
+ public static final boolean CLEAN_SUPPORTED;
+
+ private static final Method directBufferCleaner;
+ private static final Method directBufferCleanerClean;
+
+ static {
+ Method directBufferCleanerX = null;
+ Method directBufferCleanerCleanX = null;
+ boolean v;
+ try {
+ directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
+ directBufferCleanerX.setAccessible(true);
+ directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
+ directBufferCleanerCleanX.setAccessible(true);
+ v = true;
+ } catch (Exception e) {
+ v = false;
+ }
+ CLEAN_SUPPORTED = v;
+ directBufferCleaner = directBufferCleanerX;
+ directBufferCleanerClean = directBufferCleanerCleanX;
+ }
+
+
+ /**
+ * Constructs a new byte buffer directory.
+ *
+ * @param bufferSizeInBytes The size of a byte buffer
+ * @param cacheSizeInBytes The size of the cache, set to 0 to disable caching
+ * @param direct Should the byte buffers be stored outside the heap (truefalse)
+ * @param warmCache Should the cache be warmed
+ */
+ public ByteBufferDirectory(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct, boolean warmCache) {
+ disableCache = cacheSizeInBytes == 0;
+ if (!disableCache && cacheSizeInBytes < bufferSizeInBytes) {
+ throw new IllegalArgumentException("Cache size [" + cacheSizeInBytes + "] is smaller than buffer size [" + bufferSizeInBytes + "]");
+ }
+ this.bufferSizeInBytes = bufferSizeInBytes;
+ int numberOfCacheEntries = cacheSizeInBytes / bufferSizeInBytes;
+ this.cache = disableCache ? null : new ArrayBlockingQueue(numberOfCacheEntries);
+ this.cacheSizeInBytes = disableCache ? 0 : numberOfCacheEntries * bufferSizeInBytes;
+ this.direct = direct;
+ setLockFactory(new SingleInstanceLockFactory());
+ if (!disableCache && warmCache) {
+ for (int i = 0; i < numberOfCacheEntries; i++) {
+ cache.add(createBuffer());
+ }
+ }
+ }
+
+ public void setUseClean(final boolean useCleanHack) {
+ if (useCleanHack && !CLEAN_SUPPORTED)
+ throw new IllegalArgumentException("Clean hack not supported on this platform!");
+ this.useCleanHack = useCleanHack;
+ }
+
+ public boolean getUseClean() {
+ return useCleanHack;
+ }
+
+ public int cacheSizeInBytes() {
+ return this.cacheSizeInBytes;
+ }
+
+ public int bufferSizeInBytes() {
+ return bufferSizeInBytes;
+ }
+
+ public boolean isDirect() {
+ return direct;
+ }
+
+ public boolean isCacheEnabled() {
+ return !disableCache;
+ }
+
+ @Override public String[] listAll() throws IOException {
+ return files.keySet().toArray(new String[0]);
+ }
+
+ @Override public boolean fileExists(String name) throws IOException {
+ return files.containsKey(name);
+ }
+
+ @Override public long fileModified(String name) throws IOException {
+ ByteBufferFile file = files.get(name);
+ if (file == null)
+ throw new FileNotFoundException(name);
+ return file.lastModified();
+ }
+
+ @Override public void touchFile(String name) throws IOException {
+ ByteBufferFile file = files.get(name);
+ if (file == null)
+ throw new FileNotFoundException(name);
+
+ long ts2, ts1 = System.currentTimeMillis();
+ do {
+ try {
+ Thread.sleep(0, 1);
+ } catch (InterruptedException ie) {
+ // In 3.0 we will change this to throw
+ // InterruptedException instead
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ ts2 = System.currentTimeMillis();
+ } while (ts1 == ts2);
+
+ file.lastModified(ts2);
+ }
+
+ @Override public void deleteFile(String name) throws IOException {
+ ByteBufferFile file = files.remove(name);
+ if (file == null)
+ throw new FileNotFoundException(name);
+ file.clean();
+ }
+
+ @Override public long fileLength(String name) throws IOException {
+ ByteBufferFile file = files.get(name);
+ if (file == null)
+ throw new FileNotFoundException(name);
+ return file.length();
+ }
+
+ @Override public IndexOutput createOutput(String name) throws IOException {
+ ByteBufferFile file = new ByteBufferFile(this);
+ ByteBufferFile existing = files.put(name, file);
+ if (existing != null) {
+ existing.clean();
+ }
+ return new ByteBufferIndexOutput(this, file);
+ }
+
+ @Override public IndexInput openInput(String name) throws IOException {
+ ByteBufferFile file = files.get(name);
+ if (file == null)
+ throw new FileNotFoundException(name);
+ return new ByteBufferIndexInput(this, file);
+ }
+
+ @Override public void close() throws IOException {
+ String[] files = listAll();
+ for (String file : files) {
+ deleteFile(file);
+ }
+ if (!disableCache) {
+ ByteBuffer buffer = cache.poll();
+ while (buffer != null) {
+ closeBuffer(buffer);
+ buffer = cache.poll();
+ }
+ }
+ }
+
+ void releaseBuffer(ByteBuffer byteBuffer) {
+ if (disableCache) {
+ closeBuffer(byteBuffer);
+ return;
+ }
+ boolean success = cache.offer(byteBuffer);
+ if (!success) {
+ closeBuffer(byteBuffer);
+ }
+ }
+
+ ByteBuffer acquireBuffer() {
+ if (disableCache) {
+ return createBuffer();
+ }
+ ByteBuffer byteBuffer = cache.poll();
+ if (byteBuffer == null) {
+ // everything is taken, return a new one
+ return createBuffer();
+ }
+ byteBuffer.position(0);
+ return byteBuffer;
+ }
+
+ private ByteBuffer createBuffer() {
+ if (isDirect()) {
+ return ByteBuffer.allocateDirect(bufferSizeInBytes());
+ }
+ return ByteBuffer.allocate(bufferSizeInBytes());
+ }
+
+ private void closeBuffer(ByteBuffer byteBuffer) {
+ if (useCleanHack && isDirect()) {
+ try {
+ Object cleaner = directBufferCleaner.invoke(byteBuffer);
+ directBufferCleanerClean.invoke(cleaner);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // silently ignore exception
+ }
+ }
+ }
+}
Index: src/java/org/apache/lucene/store/ByteBufferFile.java
===================================================================
--- src/java/org/apache/lucene/store/ByteBufferFile.java Wed Mar 03 00:42:14 IST 2010
+++ src/java/org/apache/lucene/store/ByteBufferFile.java Wed Mar 03 00:42:14 IST 2010
@@ -0,0 +1,75 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class ByteBufferFile {
+
+ private final ByteBufferDirectory dir;
+
+ private volatile long lastModified = System.currentTimeMillis();
+
+ private volatile long length;
+
+ private volatile ByteBuffer[] buffers;
+
+ public ByteBufferFile(ByteBufferDirectory dir) {
+ this.dir = dir;
+ }
+
+ long lastModified() {
+ return lastModified;
+ }
+
+ void lastModified(long lastModified) {
+ this.lastModified = lastModified;
+ }
+
+ long length() {
+ return length;
+ }
+
+ void length(long length) {
+ this.length = length;
+ }
+
+ ByteBuffer buffer(int i) {
+ return this.buffers[i];
+ }
+
+ int numberOfBuffers() {
+ return this.buffers.length;
+ }
+
+ void buffers(ByteBuffer[] buffers) {
+ this.buffers = buffers;
+ }
+
+ void clean() {
+ if (buffers != null) {
+ for (ByteBuffer buffer : buffers) {
+ dir.releaseBuffer(buffer);
+ }
+ buffers = null;
+ }
+ }
+}
Index: src/java/org/apache/lucene/store/ByteBufferIndexOutput.java
===================================================================
--- src/java/org/apache/lucene/store/ByteBufferIndexOutput.java Wed Mar 03 00:42:41 IST 2010
+++ src/java/org/apache/lucene/store/ByteBufferIndexOutput.java Wed Mar 03 00:42:41 IST 2010
@@ -0,0 +1,124 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class ByteBufferIndexOutput extends IndexOutput {
+
+ private final ByteBufferDirectory dir;
+ private final ByteBufferFile file;
+
+ private ByteBuffer currentBuffer;
+ private int currentBufferIndex;
+
+ private long bufferStart;
+ private int bufferLength;
+
+ private ArrayList buffers = new ArrayList();
+
+ public ByteBufferIndexOutput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException {
+ this.dir = dir;
+ this.file = file;
+ switchCurrentBuffer();
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ if (!currentBuffer.hasRemaining()) {
+ currentBufferIndex++;
+ switchCurrentBuffer();
+ }
+ currentBuffer.put(b);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int len) throws IOException {
+ while (len > 0) {
+ if (!currentBuffer.hasRemaining()) {
+ currentBufferIndex++;
+ switchCurrentBuffer();
+ }
+
+ int remainInBuffer = currentBuffer.remaining();
+ int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
+ currentBuffer.put(b, offset, bytesToCopy);
+ offset += bytesToCopy;
+ len -= bytesToCopy;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ file.lastModified(System.currentTimeMillis());
+ setFileLength();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ file.buffers(buffers.toArray(new ByteBuffer[buffers.size()]));
+ }
+
+ @Override
+ public long getFilePointer() {
+ return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ // set the file length in case we seek back
+ // and flush() has not been called yet
+ setFileLength();
+ if (pos < bufferStart || pos >= bufferStart + bufferLength) {
+ currentBufferIndex = (int) (pos / dir.bufferSizeInBytes());
+ switchCurrentBuffer();
+ }
+ currentBuffer.position((int) (pos % dir.bufferSizeInBytes()));
+ }
+
+ @Override
+ public long length() throws IOException {
+ return file.length();
+ }
+
+ private void switchCurrentBuffer() throws IOException {
+ if (currentBufferIndex == buffers.size()) {
+ currentBuffer = dir.acquireBuffer();
+ buffers.add(currentBuffer);
+ } else {
+ currentBuffer = buffers.get(currentBufferIndex);
+ }
+ currentBuffer.position(0);
+ bufferStart = (long) dir.bufferSizeInBytes() * (long) currentBufferIndex;
+ bufferLength = currentBuffer.capacity();
+ }
+
+ private void setFileLength() {
+ long pointer = bufferStart + currentBuffer.position();
+ if (pointer > file.length()) {
+ file.length(pointer);
+ }
+ }
+}