Index: src/java/org/apache/lucene/store/ByteBufferIndexInput.java =================================================================== --- src/java/org/apache/lucene/store/ByteBufferIndexInput.java Wed Mar 03 00:42:35 IST 2010 +++ src/java/org/apache/lucene/store/ByteBufferIndexInput.java Wed Mar 03 00:42:35 IST 2010 @@ -0,0 +1,110 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * @author kimchy (shay.banon) + */ +public class ByteBufferIndexInput extends IndexInput { + + private final ByteBufferFile file; + private final int bufferSize; + private final long length; + + private ByteBuffer currentBuffer; + private int currentBufferIndex; + + private long bufferStart; + + + public ByteBufferIndexInput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException { + this.file = file; + this.bufferSize = dir.bufferSizeInBytes(); + this.length = file.length(); + switchCurrentBuffer(true); + } + + @Override public byte readByte() throws IOException { + if (!currentBuffer.hasRemaining()) { + currentBufferIndex++; + switchCurrentBuffer(true); + } + return currentBuffer.get(); + } + + @Override public void readBytes(byte[] b, int offset, int len) throws IOException { + while (len > 0) { + if (!currentBuffer.hasRemaining()) { + currentBufferIndex++; + switchCurrentBuffer(true); + } + + int remainInBuffer = currentBuffer.remaining(); + int bytesToCopy = len < remainInBuffer ? len : remainInBuffer; + currentBuffer.get(b, offset, bytesToCopy); + offset += bytesToCopy; + len -= bytesToCopy; + } + } + + @Override public void close() throws IOException { + } + + @Override public long getFilePointer() { + return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position(); + } + + @Override public void seek(long pos) throws IOException { + if (currentBuffer == null || pos < bufferStart || pos >= bufferStart + bufferSize) { + currentBufferIndex = (int) (pos / bufferSize); + switchCurrentBuffer(false); + } + currentBuffer.position((int) (pos % bufferSize)); + } + + @Override public long length() { + return length; + } + + private void switchCurrentBuffer(boolean enforceEOF) throws IOException { + if (currentBufferIndex >= file.numberOfBuffers()) { + // end of file reached, no more buffers left + if (enforceEOF) + throw new IOException("Read past EOF"); + else { + // Force EOF if a read takes place at this position + currentBufferIndex--; + currentBuffer.position(bufferSize); + } + } else { + // we must duplicate (and make it read only while we are at it) since we need position and such to be independant + currentBuffer = file.buffer(currentBufferIndex).asReadOnlyBuffer(); + currentBuffer.position(0); + bufferStart = (long) bufferSize * (long) currentBufferIndex; + } + } + + @Override public Object clone() { + ByteBufferIndexInput cloned = (ByteBufferIndexInput) super.clone(); + cloned.currentBuffer = currentBuffer.asReadOnlyBuffer(); + return cloned; + } +} Index: src/java/org/apache/lucene/store/ByteBufferDirectory.java =================================================================== --- src/java/org/apache/lucene/store/ByteBufferDirectory.java Wed Mar 03 00:42:10 IST 2010 +++ src/java/org/apache/lucene/store/ByteBufferDirectory.java Wed Mar 03 00:42:10 IST 2010 @@ -0,0 +1,224 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sun.nio.ch.DirectBuffer; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A memory based directory that uses {@link java.nio.ByteBuffer} in order to store the directory content. + * + *

The benefit of using {@link java.nio.ByteBuffer} is the fact that it can be stored in "native" memory + * outside of the JVM heap, thus not incurring the GC overhead of large in memory index. + * + *

Each "file" is segmented into one or more byte buffers. + * + *

Since its good practice to cache byte buffers, it also provide a simple mechanism to define a cache + * of byte buffers that are reused when possible. + * + * @author kimchy (shay.banon) + */ +public class ByteBufferDirectory extends Directory { + + private final Map files = new ConcurrentHashMap(); + + private final Queue cache; + + private final int bufferSizeInBytes; + + private final int cacheSizeInBytes; + + private final boolean disableCache; + + private final boolean direct; + + /** + * Constructs a new byte buffer directory. + * + * @param bufferSizeInBytes The size of a byte buffer + * @param cacheSizeInBytes The size of the cache, set to 0 to disable caching + * @param direct Should the byte buffers be stored outside the heap (truefalse) + * @param warmCache Should the cache be warmed + */ + public ByteBufferDirectory(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct, boolean warmCache) { + disableCache = cacheSizeInBytes == 0; + if (!disableCache && cacheSizeInBytes < bufferSizeInBytes) { + throw new IllegalArgumentException("Cache size [" + cacheSizeInBytes + "] is smaller than buffer size [" + bufferSizeInBytes + "]"); + } + this.bufferSizeInBytes = bufferSizeInBytes; + int numberOfCacheEntries = cacheSizeInBytes / bufferSizeInBytes; + this.cache = disableCache ? null : new ArrayBlockingQueue(numberOfCacheEntries); + this.cacheSizeInBytes = disableCache ? 0 : numberOfCacheEntries * bufferSizeInBytes; + this.direct = direct; + setLockFactory(new SingleInstanceLockFactory()); + if (!disableCache && warmCache) { + for (int i = 0; i < numberOfCacheEntries; i++) { + cache.add(createBuffer()); + } + } + } + + public int cacheSizeInBytes() { + return this.cacheSizeInBytes; + } + + public int bufferSizeInBytes() { + return bufferSizeInBytes; + } + + public boolean isDirect() { + return direct; + } + + public boolean isCacheEnabled() { + return !disableCache; + } + + @Override + public String[] listAll() throws IOException { + return files.keySet().toArray(new String[0]); + } + + @Override + public boolean fileExists(String name) throws IOException { + return files.containsKey(name); + } + + @Override + public long fileModified(String name) throws IOException { + ByteBufferFile file = files.get(name); + if (file == null) + throw new FileNotFoundException(name); + return file.lastModified(); + } + + @Override + public void touchFile(String name) throws IOException { + ByteBufferFile file = files.get(name); + if (file == null) + throw new FileNotFoundException(name); + + long ts2, ts1 = System.currentTimeMillis(); + do { + try { + Thread.sleep(0, 1); + } catch (InterruptedException ie) { + // In 3.0 we will change this to throw + // InterruptedException instead + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + ts2 = System.currentTimeMillis(); + } while (ts1 == ts2); + + file.lastModified(ts2); + } + + @Override + public void deleteFile(String name) throws IOException { + ByteBufferFile file = files.remove(name); + if (file == null) + throw new FileNotFoundException(name); + file.clean(); + } + + @Override + public long fileLength(String name) throws IOException { + ByteBufferFile file = files.get(name); + if (file == null) + throw new FileNotFoundException(name); + return file.length(); + } + + @Override + public IndexOutput createOutput(String name) throws IOException { + ByteBufferFile file = new ByteBufferFile(this); + ByteBufferFile existing = files.put(name, file); + if (existing != null) { + existing.clean(); + } + return new ByteBufferIndexOutput(this, file); + } + + @Override + public IndexInput openInput(String name) throws IOException { + ByteBufferFile file = files.get(name); + if (file == null) + throw new FileNotFoundException(name); + return new ByteBufferIndexInput(this, file); + } + + @Override + public void close() throws IOException { + String[] files = listAll(); + for (String file : files) { + deleteFile(file); + } + if (!disableCache) { + ByteBuffer buffer = cache.poll(); + while (buffer != null) { + closeBuffer(buffer); + buffer = cache.poll(); + } + } + } + + void releaseBuffer(ByteBuffer byteBuffer) { + if (disableCache) { + closeBuffer(byteBuffer); + return; + } + boolean success = cache.offer(byteBuffer); + if (!success) { + closeBuffer(byteBuffer); + } + } + + ByteBuffer acquireBuffer() { + if (disableCache) { + return createBuffer(); + } + ByteBuffer byteBuffer = cache.poll(); + if (byteBuffer == null) { + // everything is taken, return a new one + return createBuffer(); + } + byteBuffer.position(0); + return byteBuffer; + } + + private ByteBuffer createBuffer() { + if (isDirect()) { + return ByteBuffer.allocateDirect(bufferSizeInBytes()); + } + return ByteBuffer.allocate(bufferSizeInBytes()); + } + + private void closeBuffer(ByteBuffer byteBuffer) { + if (isDirect()) { + ((DirectBuffer) byteBuffer).cleaner().clean(); + } + } +} Index: src/java/org/apache/lucene/store/ByteBufferFile.java =================================================================== --- src/java/org/apache/lucene/store/ByteBufferFile.java Wed Mar 03 00:42:14 IST 2010 +++ src/java/org/apache/lucene/store/ByteBufferFile.java Wed Mar 03 00:42:14 IST 2010 @@ -0,0 +1,75 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.nio.ByteBuffer; + +/** + * @author kimchy (shay.banon) + */ +public class ByteBufferFile { + + private final ByteBufferDirectory dir; + + private volatile long lastModified = System.currentTimeMillis(); + + private volatile long length; + + private volatile ByteBuffer[] buffers; + + public ByteBufferFile(ByteBufferDirectory dir) { + this.dir = dir; + } + + long lastModified() { + return lastModified; + } + + void lastModified(long lastModified) { + this.lastModified = lastModified; + } + + long length() { + return length; + } + + void length(long length) { + this.length = length; + } + + ByteBuffer buffer(int i) { + return this.buffers[i]; + } + + int numberOfBuffers() { + return this.buffers.length; + } + + void buffers(ByteBuffer[] buffers) { + this.buffers = buffers; + } + + void clean() { + if (buffers != null) { + for (ByteBuffer buffer : buffers) { + dir.releaseBuffer(buffer); + } + buffers = null; + } + } +} Index: src/java/org/apache/lucene/store/ByteBufferIndexOutput.java =================================================================== --- src/java/org/apache/lucene/store/ByteBufferIndexOutput.java Wed Mar 03 00:42:41 IST 2010 +++ src/java/org/apache/lucene/store/ByteBufferIndexOutput.java Wed Mar 03 00:42:41 IST 2010 @@ -0,0 +1,124 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; + +/** + * @author kimchy (shay.banon) + */ +public class ByteBufferIndexOutput extends IndexOutput { + + private final ByteBufferDirectory dir; + private final ByteBufferFile file; + + private ByteBuffer currentBuffer; + private int currentBufferIndex; + + private long bufferStart; + private int bufferLength; + + private ArrayList buffers = new ArrayList(); + + public ByteBufferIndexOutput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException { + this.dir = dir; + this.file = file; + switchCurrentBuffer(); + } + + @Override + public void writeByte(byte b) throws IOException { + if (!currentBuffer.hasRemaining()) { + currentBufferIndex++; + switchCurrentBuffer(); + } + currentBuffer.put(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int len) throws IOException { + while (len > 0) { + if (!currentBuffer.hasRemaining()) { + currentBufferIndex++; + switchCurrentBuffer(); + } + + int remainInBuffer = currentBuffer.remaining(); + int bytesToCopy = len < remainInBuffer ? len : remainInBuffer; + currentBuffer.put(b, offset, bytesToCopy); + offset += bytesToCopy; + len -= bytesToCopy; + } + } + + @Override + public void flush() throws IOException { + file.lastModified(System.currentTimeMillis()); + setFileLength(); + } + + @Override + public void close() throws IOException { + flush(); + file.buffers(buffers.toArray(new ByteBuffer[buffers.size()])); + } + + @Override + public long getFilePointer() { + return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position(); + } + + @Override + public void seek(long pos) throws IOException { + // set the file length in case we seek back + // and flush() has not been called yet + setFileLength(); + if (pos < bufferStart || pos >= bufferStart + bufferLength) { + currentBufferIndex = (int) (pos / dir.bufferSizeInBytes()); + switchCurrentBuffer(); + } + currentBuffer.position((int) (pos % dir.bufferSizeInBytes())); + } + + @Override + public long length() throws IOException { + return file.length(); + } + + private void switchCurrentBuffer() throws IOException { + if (currentBufferIndex == buffers.size()) { + currentBuffer = dir.acquireBuffer(); + buffers.add(currentBuffer); + } else { + currentBuffer = buffers.get(currentBufferIndex); + } + currentBuffer.position(0); + bufferStart = (long) dir.bufferSizeInBytes() * (long) currentBufferIndex; + bufferLength = currentBuffer.capacity(); + } + + private void setFileLength() { + long pointer = bufferStart + currentBuffer.position(); + if (pointer > file.length()) { + file.length(pointer); + } + } +}