Index: contrib/directories/src/java/org/apache/lucene/store/CassandraDirectory.java
===================================================================
--- contrib/directories/src/java/org/apache/lucene/store/CassandraDirectory.java (revision 0)
+++ contrib/directories/src/java/org/apache/lucene/store/CassandraDirectory.java (revision 0)
@@ -0,0 +1,1951 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+/**
+ * The CassandraDirectory maps the concept of a Lucene directory to
+ * a column family that belongs to a certain keyspace located in a given
+ * Cassandra server. Furthermore, it stores each file under this directory as a
+ * row in that column family.
+ *
+ *
+ * In particular, files are broken down into blocks (whose sizes are capped),
+ * where each block (see {@link FileBlock}) is stored as the value of a column
+ * in the corresponding row. As per
+ * http://wiki.apache.org/cassandra/CassandraLimitations, this is the
+ * recommended approach for dealing with large objects, which Lucene files tend
+ * to be. Furthermore, a descriptor of the file (see {@link FileDescriptor})
+ * that outlines a map of blocks therein is stored as one of the columns in that
+ * row as well. Think of this descriptor as an inode for Cassandra-based files.
+ *
+ *
+ *
+ * The exhaustive mapping of a Lucene directory (file) to a Cassandra column
+ * family (row) is captured in the {@link ColumnOrientedDirectory} (
+ * {@link ColumnOrientedFile}) inner-class. Specifically, they interpret
+ * Cassandra's data model in terms of that of Lucene. More importantly, these
+ * are the only two inner-classes that have a foot in both the Lucene and
+ * Cassandra camps.
+ *
+ *
+ *
+ * All writes to a file in this directory occur through a
+ * {@link CassandraIndexOutput}, which puts the data flushed from a write-behind
+ * buffer into the fitting set of blocks. By the same token, all reads from a
+ * file in this directory occur through a {@link CassandraIndexInput}, which
+ * gets the data needed by a read-ahead buffer from the right set of blocks.
+ *
+ *
+ *
+ * The last (but not the least) inner-class, {@link CassandraClient}, acts as a
+ * facade for a Thrift-based Cassandra client. In short, it provides operations
+ * to get/put rows/columns in the column family and keyspace associated with
+ * this directory.
+ *
+ *
+ *
+ * Unlike Lucandra, which attempts to bridge the gap between Lucene and
+ * Cassandra at the document-level, the {@link CassandraDirectory} is
+ * self-sufficient in the sense that it does not require a re-write of other
+ * components in the Lucene stack. In other words, one may use the
+ * {@link CassandraDirectory} in conjunction with the {@link IndexWriter} and
+ * {@link IndexReader}, as you would any other kind of Lucene {@link Directory}.
+ * Moreover, given the the data unit that is transferred to and from Cassandra
+ * is a large-sized block, one may expect fewer round trips, and hence better
+ * throughputs, from the {@link CassandraDirectory}.
+ *
+ *
+ *
+ * In conclusion, this directory attempts to marry the rich search-based query
+ * language of Lucene with the distributed fault-tolerant database that is
+ * Cassandra. By delegating the responsibilities of replication, durability and
+ * elasticity to the directory, we free the layers above from such
+ * non-functional concerns. Our hope is that users will choose to make their
+ * large-scale indices instantly scalable by seamlessly migrating them to this
+ * type of directory (using {@link Directory#copyTo(Directory)}).
+ *
+ *
+ * @author Karthick Sankarachary
+ */
+public class CassandraDirectory extends Directory {
+
+ // The default size of a block, which in turn maps to a cassandra column.
+ public static final int DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024;
+
+ // The default size of the buffer, which is managed by the index output.
+ public static final int DEFAULT_BUFFER_SIZE = 1 * DEFAULT_BLOCK_SIZE;
+
+ // The default host where the cassandra server is located.
+ public static final String DEFAULT_CASSANDRA_HOST = "localhost";
+
+ // The default port where the cassandra server is listening.
+ public static final int DEFAULT_CASSANDRA_PORT = 9160;
+
+ // The default flag indicating whether the cassandra server is framed.
+ public static final boolean DEFAULT_CASSANDRA_FRAMED = false;
+
+ // The default keyspace in which to store cassandra directories.
+ public static final String DEFAULT_CASSANDRA_KEYSPACE = "lucene";
+
+ // The name of every column that holds a file block starts with this prefix.
+ protected static final String BLOCK_COLUMN_NAME_PREFIX = "BLOCK-";
+
+ // The name of the column that holds the file descriptor.
+ protected static final String descriptorColumn = "DESCRIPTOR";
+
+ // The list of meta-columns currently defined for each file (or row).
+ protected static final List systemColumns = new ArrayList();
+ static {
+ systemColumns.add(descriptorColumn.getBytes());
+ }
+
+ // The reference to the cassandra client that talks to the thrift server.
+ protected CassandraClient cassandraClient;
+
+ // The keyspace in which to read/write cassandra directories and files.
+ protected String keyspace;
+
+ // The name of the column family that maps to this cassandra directory.
+ protected String columnFamily;
+
+ // The current size of the block to write out to a column.
+ protected int blockSize;
+
+ // The current size of the buffer that (ideally) is big enough to hold one or
+ // more file blocks. In essence, the write (read) buffer acts as a
+ // write-behind (read-ahead) cache that performs a lazy write (read) only when
+ // the data is evicted (or flushed) from the cache. Given that, we should try
+ // to write and read data in block multiples.
+ protected int bufferSize;
+
+ protected ColumnOrientedDirectory columnOrientedDirectory;
+
+ /**
+ * Construct a Cassandra-based directory that maps to the given column family,
+ * which is located in the default keyspace.
+ *
+ * @param columnFamily
+ * the name of the column family that denotes this directory
+ * @throws IOException
+ */
+ public CassandraDirectory(String columnFamily) throws IOException {
+ this(DEFAULT_CASSANDRA_KEYSPACE, columnFamily);
+ }
+
+ /**
+ * Construct a Cassandra-based directory that maps to the given column family,
+ * which is located in the given keyspace. Note that it uses the default block
+ * and buffer sizes.
+ *
+ * @param keyspace
+ * the name of the keyspace in which to find the column family
+ * @param columnFamily
+ * the name of the column family that dentoes this directory
+ * @throws IOException
+ */
+ public CassandraDirectory(String keyspace, String columnFamily)
+ throws IOException {
+ this(keyspace, columnFamily, DEFAULT_BLOCK_SIZE, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Construct a Cassandra-based directory that maps to the given column family,
+ * which is located in the given keyspace. Moreover, it uses the given block
+ * size as the column size, and the given buffer size for the write-behind and
+ * read-ahead cache.
+ *
+ * @param keyspace
+ * the name of the keyspace in which to find the column family
+ * @param columnFamily
+ * the name of the column family that dentoes this directory
+ * @param blockSize
+ * the size of the file block
+ * @param bufferSize
+ * the size of the read/write buffer
+ * @throws IOException
+ */
+ public CassandraDirectory(String keyspace, String columnFamily,
+ int blockSize, int bufferSize) throws IOException {
+ this(DEFAULT_CASSANDRA_HOST, DEFAULT_CASSANDRA_PORT,
+ DEFAULT_CASSANDRA_FRAMED, keyspace, columnFamily, blockSize, bufferSize);
+ }
+
+ /**
+ * Construct a Cassandra-based directory that maps to the given column family,
+ * which is located in the given keyspace. In particular, this directory talks
+ * to the cassandra server running on the given host and port.
+ *
+ * @param host
+ * the host where the cassandra server is located
+ * @param port
+ * the port where the cassandra server is listening
+ * @param keyspace
+ * the name of the keyspace in which to find the column family
+ * @param columnFamily
+ * the name of the column family that dentoes this directory
+ * @throws IOException
+ */
+ public CassandraDirectory(String host, Integer port, String keyspace,
+ String columnFamily) throws IOException {
+ this(host, port, DEFAULT_CASSANDRA_FRAMED, keyspace, columnFamily,
+ DEFAULT_BLOCK_SIZE, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Construct a Cassandra-based directory that maps to the given column family,
+ * which is located in the given keyspace. In particular, this directory talks
+ * to the cassandra server running on the given host and port.
+ *
+ * @param host
+ * the host where the cassandra server is located
+ * @param port
+ * the port where the cassandra server is listening
+ * @param framed
+ * a flag to ensure a fully read message every time by preceeding
+ * messages with a 4-byte frame size
+ * @param keyspace
+ * the name of the keyspace in which to find the column family
+ * @param columnFamily
+ * the name of the column family that dentoes this directory
+ * @param blockSize
+ * the size of the file block
+ * @param bufferSize
+ * the size of the read/write buffer
+ * @throws IOException
+ */
+ public CassandraDirectory(String host, Integer port, boolean framed,
+ String keyspace, String columnFamily, int blockSize, int bufferSize)
+ throws IOException {
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.blockSize = blockSize;
+ this.bufferSize = bufferSize;
+ this.cassandraClient = new CassandraClient(host, port, framed);
+ this.columnOrientedDirectory = new ColumnOrientedDirectory();
+ }
+
+ /**
+ * @return the name of the keyspace in which to find the column family
+ */
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ /**
+ * @return the name of the column family that represents this directory
+ */
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ /**
+ * @return the size of the file block
+ */
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * @return the size of the read/write buffer
+ */
+ public long getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Creates a new, empty file in the directory with the given file name.
+ *
+ * @return a stream that writes into this file
+ */
+ @Override
+ public IndexOutput createOutput(String fileName) throws IOException {
+ ensureOpen();
+ return new CassandraIndexOutput(fileName, bufferSize);
+ }
+
+ /**
+ * Open an existing file in the directory with the given file name.
+ *
+ * @return a stream that reads from an existing file.
+ */
+ @Override
+ public IndexInput openInput(String fileName) throws IOException {
+ ensureOpen();
+ return new CassandraIndexInput(fileName, bufferSize);
+ }
+
+ /**
+ * Returns an array of strings, one for each (non-deleted) file in the
+ * directory.
+ *
+ * @throws NoSuchDirectoryException
+ * if the directory is not prepared for any write operations (such
+ * as {@link #createOutput(String)}).
+ * @throws IOException
+ * in case of other IO errors
+ */
+ @Override
+ public String[] listAll() throws IOException {
+ ensureOpen();
+ return columnOrientedDirectory.getFileNames();
+ }
+
+ /**
+ * @return true iff a file with the given name exists
+ */
+ @Override
+ public boolean fileExists(String fileName) throws IOException {
+ ensureOpen();
+ try {
+ return fileLength(fileName) >= 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Returns the length of a file in the directory. This method follows the
+ * following contract:
+ *
+ * - Throws {@link FileNotFoundException} if the file does not exist
+ *
- Returns a value ≥0 if the file exists, which specifies its length.
+ *
+ *
+ * @param name
+ * the name of the file for which to return the length.
+ * @throws FileNotFoundException
+ * if the file does not exist.
+ * @throws IOException
+ * if there was an IO error while retrieving the file's length.
+ */
+ @Override
+ public long fileLength(String fileName) throws IOException {
+ ensureOpen();
+ FileDescriptor descriptor = columnOrientedDirectory
+ .getFileDescriptor(fileName);
+ if (descriptor == null) {
+ throw new IOException("Could not find descriptor for file " + fileName);
+ }
+ return descriptor.getLength();
+ }
+
+ /**
+ * @return the time the named file was last modified
+ */
+ @Override
+ public long fileModified(String fileName) throws IOException {
+ ensureOpen();
+ FileDescriptor descriptor = columnOrientedDirectory
+ .getFileDescriptor(fileName);
+ if (descriptor == null) {
+ throw new IOException("Could not find descriptor for file " + fileName);
+ }
+ return descriptor.getLastModified();
+ }
+
+ /**
+ * Set the modified time of an existing file to now.
+ */
+ @Override
+ public void touchFile(String fileName) throws IOException {
+ ensureOpen();
+ try {
+ FileDescriptor fileDescriptor = columnOrientedDirectory
+ .getFileDescriptor(fileName);
+ fileDescriptor.setLastModified(System.currentTimeMillis());
+ columnOrientedDirectory.setFileDescriptor(fileDescriptor);
+ } catch (Exception e) {
+ throw new IOException("Could not touch file " + fileName, e);
+ }
+ }
+
+ /**
+ * Removes an existing file in the directory.
+ */
+ @Override
+ public void deleteFile(String fileName) throws IOException {
+ ensureOpen();
+ FileDescriptor fileDescriptor = columnOrientedDirectory
+ .getFileDescriptor(fileName);
+ if (fileDescriptor != null) {
+ fileDescriptor.setDeleted(true);
+ columnOrientedDirectory.setFileDescriptor(fileDescriptor);
+ }
+ }
+
+ /**
+ * Closes all the client-side resources obtained by this directory instance.
+ */
+ @Override
+ public void close() throws IOException {
+ isOpen = false;
+ }
+
+ /**
+ * The FileDescriptor captures the meta-data of the file, a la
+ * Unix inodes (index nodes). In addition to the usual tidbits such as name,
+ * length and timestamps, it carries a flag (@see {@link #deleted}) that
+ * indicates whether the file has been deleted or not.
+ *
+ *
+ * The data in the file is indexed through a ordered list of {@link FileBlock}
+ * s, where each block denotes a contiguous portion of the file data. By
+ * walking through the blocks sequentially in the given order, one can
+ * retrieve the entire contents of the file. A psuedo-random access of the
+ * file can be effected by loading into memory the entire block to which the
+ * (random) file pointer maps, and then positioning the file pointer within
+ * the in-memory block.
+ *
+ */
+ public static class FileDescriptor {
+ // The name of the file.
+ private String name;
+
+ // The length of the file.
+ private long length;
+
+ // A flag indicating whether the file has been deleted. Currently, we cannot
+ // delete rows from a column family, which means that it is not physically
+ // possible to delete a file. To workaround this issue, which is described
+ // in detail at https://issues.apache.org/jira/browse/CASSANDRA-293, we rely
+ // on this flag instead. Note that, as a side-effect of this design, we
+ // could potentially allow cassandra files to be "unremoved", if you will.
+ private boolean deleted;
+
+ // The timestamp at which the file was last modified.
+ private long lastModified;
+
+ // The timestamp at which the file was last accessed.
+ private long lastAccessed;
+
+ // The maximum size of the block that may be stored in a column.
+ private long blockSize;
+
+ // The ordered list of blocks in this file.
+ private LinkedList blocks;
+
+ // The number to use for the next block that will be allocated. If it is
+ // uninitialized (i.e., -1), then it forces the descriptor to reset it to
+ // the highest number of any block in {@link #blocks}.
+ private int nextBlockNumber = -1;
+
+ /**
+ * Construct a file descriptor for the given file name, using the default
+ * block size.
+ *
+ * @param fileName
+ * the name of the file
+ */
+ public FileDescriptor(String fileName) {
+ this(fileName, DEFAULT_BLOCK_SIZE);
+ }
+
+ /**
+ * Construct a file descriptor for the given file name and block size.
+ *
+ * @param fileName
+ * the name of the file
+ * @param blockSize
+ * the size of the block
+ */
+ public FileDescriptor(String fileName, long blockSize) {
+ setName(fileName);
+ setLength(0);
+ Date now = new Date();
+ setLastAccessed(now.getTime());
+ setLastModified(now.getTime());
+ setBlockSize(blockSize);
+ setBlocks(new LinkedList());
+ }
+
+ /**
+ * @return the name of the file
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Rename the file.
+ *
+ * @param name
+ * the name of the file
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the current length of the file
+ */
+ public long getLength() {
+ return length;
+ }
+
+ /**
+ * Resize the file.
+ *
+ * @param length
+ * the new length of the file
+ */
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ /**
+ * @return true iff the file has been deleted
+ */
+ public boolean isDeleted() {
+ return deleted;
+ }
+
+ /**
+ * Mark the file as deleted (or undeleted) based on whether the given flag
+ * is true (or not).
+ *
+ * @param deleted
+ * should the file be marked as deleted?
+ */
+ public void setDeleted(boolean deleted) {
+ this.deleted = deleted;
+ }
+
+ /**
+ * @return the timestamp at which the file was last modified
+ */
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ /**
+ * Set the timestamp at which the file was last modified.
+ *
+ * @param lastModified
+ * the last modified timestamp
+ */
+ public void setLastModified(long lastModified) {
+ this.lastModified = lastModified;
+ }
+
+ /**
+ * @return the timestamp at which the file was last accessed
+ */
+ public long getLastAccessed() {
+ return lastAccessed;
+ }
+
+ /**
+ * Set the timestamp at which the file was last accessed.
+ *
+ * @param lastModified
+ * the last accessed timestamp
+ */
+ public void setLastAccessed(long lastAccessed) {
+ this.lastAccessed = lastAccessed;
+ }
+
+ /**
+ * @return the maximum size of the block that may be stored in a column
+ */
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * Set the maximum size of the block that may be stored in a column.
+ *
+ * @param blockSize
+ * the block size
+ */
+ public void setBlockSize(long blockSize) {
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * @return the ordered list of file blocks
+ */
+ public List getBlocks() {
+ return blocks;
+ }
+
+ /**
+ * Set the ordered list of file blocks.
+ *
+ * @param blocks
+ * the ordered list of file blocks
+ */
+ public void setBlocks(List blocks) {
+ if (LinkedList.class.isAssignableFrom(blocks.getClass())) {
+ this.blocks = (LinkedList) blocks;
+ } else {
+ this.blocks = new LinkedList(blocks);
+ }
+ }
+
+ /**
+ * @return the first block in the file
+ */
+ public FileBlock getFirstBlock() {
+ if (blocks.isEmpty()) {
+ blocks.add(createBlock());
+ }
+ return blocks.getFirst();
+ }
+
+ /**
+ * @return the last block in the file
+ */
+ public FileBlock getLastBlock() {
+ if (blocks.isEmpty()) {
+ blocks.add(createBlock());
+ }
+ return blocks.getLast();
+ }
+
+ /**
+ * @return true iff the given block is the first one in the file
+ */
+ public boolean isFirstBlock(FileBlock nextBlock) {
+ return getFirstBlock().equals(nextBlock);
+ }
+
+ /**
+ * @return true iff the given block is the last one in the file
+ */
+ public boolean isLastBlock(FileBlock nextBlock) {
+ return getLastBlock().equals(nextBlock);
+ }
+
+ /**
+ * Return the block that logically follows the given block.
+ *
+ * @param block
+ * an existing file block
+ * @return the block that logically follows the given block
+ */
+ public FileBlock getNextBlock(FileBlock block) {
+ int blockIndex = blocks.indexOf(block);
+ return (blockIndex != -1 && blockIndex < (blocks.size() - 1)) ? blocks
+ .get(blockIndex + 1) : null;
+ }
+
+ /**
+ * Add the given block as the last block in the file.
+ *
+ * @param newBlock
+ * the block to be appended to the file
+ */
+ public void addLastBlock(FileBlock newBlock) {
+ blocks.addLast(newBlock);
+ }
+
+ /**
+ * Add the given block as the first block in the file.
+ *
+ * @param newBlock
+ * the block to be prepended to the file
+ */
+ public void addFirstBlock(FileBlock newBlock) {
+ blocks.addFirst(newBlock);
+ }
+
+ /**
+ * Insert a new block either after or before an existing block, based on
+ * whether or not the insertAfter flag is true.
+ *
+ * @param existingBlock
+ * an existing file block
+ * @param newBlock
+ * a new file block
+ * @param insertAfter
+ * whether or not to insert new block after the existing block
+ */
+ public void insertBlock(FileBlock existingBlock, FileBlock newBlock,
+ boolean insertAfter) {
+ int existingIndex = blocks.indexOf(existingBlock);
+ if (existingIndex == -1) {
+ blocks.add(newBlock);
+ } else {
+ if (insertAfter) {
+ blocks.add(existingIndex + 1, newBlock);
+ } else {
+ blocks.add(existingIndex, newBlock);
+ }
+ }
+ }
+
+ /**
+ * Replace an existing block with a new block.
+ *
+ * @param existingBlock
+ * an existing file block
+ * @param newBlock
+ * a new file block
+ */
+ public void replaceBlock(FileBlock existingBlock, FileBlock newBlock) {
+ int existingIndex = blocks.indexOf(existingBlock);
+ if (existingIndex != -1) {
+ blocks.remove(existingIndex);
+ blocks.add(existingIndex, newBlock);
+ }
+ }
+
+ /**
+ * Remove an existing block.
+ *
+ * @param existingBlock
+ * an existing block
+ * @return the index of the block just removed
+ */
+ public int removeBlock(FileBlock existingBlock) {
+ int existingIndex = blocks.indexOf(existingBlock);
+ if (existingIndex != -1) {
+ blocks.remove(existingBlock);
+ }
+ return existingIndex;
+ }
+
+ /**
+ * Create a file block with no data in it. The block number assigned to new
+ * blocks is set to auto-increment.
+ *
+ * @return the newly created file block
+ */
+ public FileBlock createBlock() {
+ FileBlock newBlock = new FileBlock();
+ newBlock.setBlockName(getNextBlockNumber());
+ newBlock.setBlockSize(getBlockSize());
+ newBlock.setDataLength(0);
+ return newBlock;
+ }
+
+ /**
+ * Return the block number to use for the next block that is allocated. This
+ * number starts from 0 and will auto-increment with each block allocation.
+ *
+ * @return the next block number to use
+ */
+ public int getNextBlockNumber() {
+ if (nextBlockNumber == -1) {
+ for (FileBlock fileBlock : blocks) {
+ nextBlockNumber = Math.max(nextBlockNumber, fileBlock
+ .getBlockNumber());
+ }
+ }
+ return ++nextBlockNumber;
+ }
+
+ }
+
+ /**
+ * A FileBlock denotes a partition of the file whose size is
+ * capped by a certain limit (@see {@link #blockSize}). In particular, it
+ * refers to a contiguous sequence of bytes, which is stored as the value of a
+ * column in the column family that represents this directory.
+ *
+ *
+ * Ideally, each block in the file (except for maybe the last one) should
+ * contain as many bytes as {@link #blockSize}. However, at times, blocks tend
+ * to become fragmented in the sense that the {@link blockSize} bytes that was
+ * supposed to go in one (ideal) block gets spread across multiple blocks,
+ * each containing a portion of the ideal block. This may happen when writes
+ * occur after the file pointer is randomly moved across the file. While there
+ * are ways to avoid or at least mitigate the fragmentation issue, we
+ * currently deal with it as a part of life. In the event a block represents a
+ * fragment, it is important to know the offset of the fragment relative to
+ * the block where the (fragmented) block begins. This information is captured
+ * in {@link FileBlock#dataOffset}.
+ *
+ *
+ *
+ * There is some file block information that is not actually saved in
+ * cassandra, but rather calculated on the fly. For example, the
+ * {@link FileBlock#blockOffset} is used to keep track of the offset of the
+ * block relative to the file in. Similarly, the
+ * {@link FileBlock#dataPosition} notes where the file pointer is positioned
+ * relative to the block. Both of these fields are calculated after the file
+ * descriptor is loaded from Cassandra.
+ *
+ */
+ public static class FileBlock implements Cloneable {
+ // The name of the file block.
+ private String blockName;
+
+ // The number of the file block.
+ private int blockNumber;
+
+ // The maximum size of the file block.
+ private long blockSize;
+
+ // The offset of the first byte in this block relative to the file.
+ private long blockOffset;
+
+ // The offset within the file block range where the first data byte appears.
+ private long dataOffset;
+
+ // The length of the data starting from {@link #dataOffset}
+ private int dataLength;
+
+ // The position of the file pointer relative to this block, assuming that
+ // pointer is currently inside this block to begin with.
+ private int dataPosition;
+
+ /**
+ * Construct an empty file block.
+ */
+ public FileBlock() {}
+
+ /**
+ * @return the name of the file block
+ */
+ public String getBlockName() {
+ return blockName;
+ }
+
+ /**
+ * Set the name of the file block.
+ *
+ * @param blockName
+ * the name of the file block
+ */
+ public void setBlockName(String blockName) {
+ this.blockName = blockName;
+ }
+
+ /**
+ * Set the name of the file block based on the given block number.
+ *
+ * @param blockNumber
+ * the block number
+ */
+ public void setBlockName(int blockNumber) {
+ this.blockNumber = blockNumber;
+ this.blockName = createBlockName(blockNumber);
+ }
+
+ /**
+ * @return the number of this file block
+ */
+ public int getBlockNumber() {
+ return blockNumber;
+ }
+
+ /**
+ * Set the number of this file block.
+ *
+ * @param blockNumber
+ * the number for this file block
+ */
+ public void setBlockNumber(int blockNumber) {
+ this.blockNumber = blockNumber;
+ }
+
+ /**
+ * @return the maximum size of this file block
+ */
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * Set the maximum size of this file block.
+ *
+ * @param blockSize
+ * the maximum block size
+ */
+ public void setBlockSize(long blockSize) {
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * @return the offset of the first byte in this block relative to the file
+ */
+ public long getBlockOffset() {
+ return blockOffset;
+ }
+
+ /**
+ * Set the offset of the first byte in this block relative to the file.
+ *
+ * @param blockOffset
+ * the new offset for this file block
+ */
+ public void setBlockOffset(long blockOffset) {
+ this.blockOffset = blockOffset;
+ }
+
+ /**
+ * @return the offset within the file block range where the first data byte
+ * appears
+ */
+ public long getDataOffset() {
+ return dataOffset;
+ }
+
+ /**
+ * Set the offset within the file block range where the first data byte
+ * appears.
+ *
+ * @param dataOffset
+ * the data offset
+ */
+ public void setDataOffset(long dataOffset) {
+ this.dataOffset = dataOffset;
+ }
+
+ /**
+ * @return the length of the data starting from {@link #dataOffset}
+ */
+ public int getDataLength() {
+ return dataLength;
+ }
+
+ /**
+ * Set the length of the data starting from {@link #dataOffset}.
+ *
+ * @param dataLength
+ * the new data length
+ */
+ public void setDataLength(int dataLength) {
+ this.dataLength = dataLength;
+ }
+
+ /**
+ * Calculate the offset of the last byte of data in this file block relative
+ * to the block.
+ *
+ * @return the last byte's offset relative to the block
+ */
+ public long getLastDataOffset() {
+ return getDataOffset() + getDataLength();
+ }
+
+ /**
+ * @return the position of the file pointer relative to this block, assuming
+ * that pointer is currently inside this block to begin with
+ */
+ public int getDataPosition() {
+ return dataPosition;
+ }
+
+ /**
+ * Set the position of the file pointer relative to this block, assuming
+ * that pointer is currently inside this block to begin with
+ *
+ * @param dataPosition
+ * the new data position
+ */
+ public void setDataPosition(int dataPosition) {
+ this.dataPosition = dataPosition;
+ }
+
+ public long getPositionOffset() {
+ return getDataOffset() + getDataPosition();
+ }
+
+ /**
+ * Create a readable name of the block, derived from it's block number.
+ *
+ * @param blockNumber
+ * the number of the block to use in the name
+ * @return
+ */
+ public static String createBlockName(int blockNumber) {
+ return BLOCK_COLUMN_NAME_PREFIX + blockNumber;
+ }
+
+ /**
+ * @return a shallow clone of this file block
+ */
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
+ }
+
+ // A comparator that checks if two byte arrays are the same or not.
+ protected static final Comparator BYTE_ARRAY_COMPARATOR = new Comparator() {
+ public int compare(byte[] o1, byte[] o2) {
+ if (o1 == null || o2 == null) {
+ return (o1 != null ? -1 : o2 != null ? 1 : 0);
+ }
+ if (o1.length != o2.length) {
+ return o1.length - o2.length;
+ }
+ for (int i = 0; i < o1.length; i++) {
+ if (o1[i] != o2[i]) {
+ return o1[i] - o2[i];
+ }
+ }
+ return 0;
+ }
+ };
+
+ /**
+ * The BlockMap keeps track of file blocks by their names. Given
+ * that the name is a byte array, we rely on a custom comparator that knows
+ * how to compare such names.
+ */
+ protected class BlockMap extends TreeMap {
+ private static final long serialVersionUID = 1550200273310875675L;
+
+ /**
+ * Define a block map which is essentially a map of a block name (in the
+ * form of bytes) to the block data (again, in the form of bytes). Given
+ * that byte arrays don't lend themselves to comparison naturally, we pass
+ * it a custom comparator.
+ */
+ public BlockMap() {
+ super(BYTE_ARRAY_COMPARATOR);
+ }
+
+ /**
+ * Put a tuple in the block map, where the key is a
+ * {@link java.lang.String}.
+ *
+ * @param key
+ * a stringified key
+ * @param value
+ * a byte array value
+ * @return the previously associated value
+ */
+ public byte[] put(String key, byte[] value) {
+ return super.put(key.getBytes(), value);
+ }
+
+ /**
+ * Put a tuple in the block map, where the value is a
+ * {@link java.lang.String}.
+ *
+ * @param key
+ * a byte array key
+ * @param value
+ * a stringified value
+ * @return the previously associated value
+ */
+ public byte[] put(byte[] key, String value) {
+ return super.put(key, value.getBytes());
+ }
+
+ /**
+ * Put a tuple in the block map, where both the key and value
+ * are a {@link java.lang.String}.
+ *
+ * @param key
+ * a stringified key
+ * @param value
+ * a stringified value
+ * @return the previously associated value
+ */
+ public byte[] put(String key, String value) {
+ return super.put(key.getBytes(), value.getBytes());
+ }
+
+ /**
+ * Get the value for the given key, which is a {@link java.lang.String}.
+ *
+ * @param key
+ * a stringified key
+ * @return the currently associated value
+ */
+ public byte[] get(String key) {
+ return super.get(key.getBytes());
+ }
+ }
+
+ /**
+ * A utility for serializing (and deserialize) the file descriptor to (and
+ * from) JSON objects.
+ */
+ public static class FileDescriptorUtils {
+ /**
+ * Convert the given file descriptor to bytes.
+ *
+ * @param fileDescriptor
+ * @return
+ * @throws IOException
+ */
+ public static byte[] toBytes(FileDescriptor fileDescriptor)
+ throws IOException {
+ return toString(fileDescriptor).getBytes();
+ }
+
+ /**
+ * Convert the given file descriptor to a String.
+ *
+ * @param fileDescriptor
+ * @return
+ * @throws IOException
+ */
+ public static String toString(FileDescriptor fileDescriptor)
+ throws IOException {
+ return toJSON(fileDescriptor).toString();
+ }
+
+ /**
+ * Convert the given file descriptor to a {@link JSONObject}
+ *
+ * @param fileDescriptor
+ * @return
+ * @throws IOException
+ */
+ public static JSONObject toJSON(FileDescriptor fileDescriptor)
+ throws IOException {
+ try {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("name", fileDescriptor.getName());
+ jsonObject.put("length", fileDescriptor.getLength());
+ jsonObject.put("deleted", fileDescriptor.isDeleted());
+ jsonObject.put("lastModified", fileDescriptor.getLastModified());
+ jsonObject.put("lastAccessed", fileDescriptor.getLastAccessed());
+ JSONArray jsonArray = new JSONArray();
+ for (FileBlock fileBlock : fileDescriptor.getBlocks()) {
+ JSONObject blockObject = new JSONObject();
+ blockObject.put("columnName", fileBlock.getBlockName());
+ blockObject.put("blockNumber", fileBlock.getBlockNumber());
+ blockObject.put("blockSize", fileBlock.getBlockSize());
+ blockObject.put("dataOffset", fileBlock.getDataOffset());
+ blockObject.put("dataLength", fileBlock.getDataLength());
+ jsonArray.put(blockObject);
+ }
+ jsonObject.put("blocks", jsonArray);
+ return jsonObject;
+ } catch (JSONException e) {
+ throw new IOException("Unable to serialize file descriptor for "
+ + fileDescriptor.getName(), e);
+ }
+ }
+
+ /**
+ * Convert the given bytes to a file descriptor.
+ *
+ * @param descriptorBytes
+ * @return
+ * @throws IOException
+ */
+ public static FileDescriptor fromBytes(byte[] descriptorBytes)
+ throws IOException {
+ try {
+ if (descriptorBytes == null) {
+ return null;
+ }
+ JSONTokener tokener = new JSONTokener(new InputStreamReader(
+ new ByteArrayInputStream(descriptorBytes)));
+ FileDescriptor fileDescriptor = FileDescriptorUtils
+ .fromJSON((JSONObject) tokener.nextValue());
+ return (!fileDescriptor.isDeleted() ? fileDescriptor : null);
+ } catch (JSONException e) {
+ throw new IOException("Could not get descriptor for file.", e);
+ }
+ }
+
+ /**
+ * Convert the given {@link JSONObject} to a file descriptor.
+ *
+ * @param jsonObject
+ * @return
+ * @throws IOException
+ */
+ public static FileDescriptor fromJSON(JSONObject jsonObject)
+ throws IOException {
+ try {
+ FileDescriptor fileDescriptor = new FileDescriptor(jsonObject
+ .getString("name"));
+ fileDescriptor.setLength(jsonObject.getLong("length"));
+ fileDescriptor.setDeleted(jsonObject.getBoolean("deleted"));
+ fileDescriptor.setLastModified(jsonObject.getLong("lastModified"));
+ fileDescriptor.setLastAccessed(jsonObject.getLong("lastAccessed"));
+ fileDescriptor.setBlocks(new LinkedList());
+ JSONArray blockArray = jsonObject.getJSONArray("blocks");
+ if (blockArray != null) {
+ for (int index = 0; index < blockArray.length(); index++) {
+ JSONObject blockObject = (JSONObject) blockArray.get(index);
+ FileBlock fileBlock = new FileBlock();
+ fileBlock.setBlockName(blockObject.getString("columnName"));
+ fileBlock.setBlockNumber(blockObject.getInt("blockNumber"));
+ fileBlock.setBlockSize(blockObject.getInt("blockSize"));
+ fileBlock.setDataOffset(blockObject.getInt("dataOffset"));
+ fileBlock.setDataLength(blockObject.getInt("dataLength"));
+ fileDescriptor.addLastBlock(fileBlock);
+ }
+ }
+ return fileDescriptor;
+ } catch (JSONException e) {
+ throw new IOException("Unable to de-serialize file descriptor from "
+ + jsonObject, e);
+ }
+ }
+
+ /**
+ * Seek to the file block that the given file pointer positions itself on.
+ *
+ * @param descriptor
+ * the descriptor of the file
+ * @param filePointer
+ * the pointer within the file to seek to
+ * @return
+ */
+ public static FileBlock seekBlock(FileDescriptor descriptor,
+ long filePointer) {
+ long currentPointer = 0;
+ for (FileBlock fileBlock : descriptor.getBlocks()) {
+ if (filePointer <= currentPointer) {
+ fileBlock.setDataPosition((int) (filePointer - currentPointer));
+ fileBlock.setBlockOffset(currentPointer);
+ return fileBlock;
+ }
+ currentPointer += fileBlock.getDataLength();
+ }
+ return null;
+ }
+ }
+
+ /**
+ * The ColumnOrientedDirectory captures the mapping of the
+ * concepts of a directory to a column family in Cassandra. Specifically, it
+ * treats each row in the column family as a file underneath the directory.
+ *
+ *
+ * This class in turn relies on the {@link CassandraClient} for all low-level
+ * gets and puts to the Cassandra server. More importantly, it does not
+ * require that the {@link CassandraClient} to be familiar with the notion of
+ * Lucene directories. Rather, it transparently translates those notions to
+ * column families. In so doing, it ends up hiding the Cassandra layer from
+ * its consumers.
+ *
+ */
+ public class ColumnOrientedDirectory {
+ /**
+ * @return the names of the files in this directory
+ * @throws IOException
+ */
+ public String[] getFileNames() throws IOException {
+ byte[][] keys = cassandraClient.getKeys(systemColumns);
+ List fileNames = new ArrayList();
+ for (byte[] key : keys) {
+ fileNames.add(new String(key));
+ }
+ return fileNames.toArray(new String[] {});
+ }
+
+ /**
+ * Return the file descriptor for the file of the given name. If the file
+ * cannot be found, then return null, instead of trying to create it.
+ *
+ * @param fileName
+ * the name of the file
+ * @return the descriptor for the given file
+ * @throws IOException
+ */
+ protected FileDescriptor getFileDescriptor(String fileName)
+ throws IOException {
+ return getFileDescriptor(fileName, false);
+ }
+
+ /**
+ * Return the file descriptor for the file of the given name.
+ *
+ * @param fileName
+ * the name of the file
+ * @param createIfNotFound
+ * if the file wasn't found, create it
+ * @return the descriptor for the given file
+ * @throws IOException
+ */
+ protected FileDescriptor getFileDescriptor(String fileName,
+ boolean createIfNotFound) throws IOException {
+ FileDescriptor fileDescriptor = FileDescriptorUtils
+ .fromBytes(cassandraClient.getColumn(fileName.getBytes(),
+ descriptorColumn.getBytes()));
+
+ if (fileDescriptor == null && createIfNotFound) {
+ fileDescriptor = new FileDescriptor(fileName, getBlockSize());
+ setFileDescriptor(fileDescriptor);
+ }
+ return fileDescriptor;
+ }
+
+ /**
+ * Save the given file descriptor.
+ *
+ * @param fileDescriptor
+ * the file descriptor being saved
+ * @throws IOException
+ */
+ public void setFileDescriptor(FileDescriptor fileDescriptor)
+ throws IOException {
+ BlockMap blockMap = new BlockMap();
+ blockMap.put(descriptorColumn, FileDescriptorUtils
+ .toString(fileDescriptor));
+ cassandraClient.setColumns(fileDescriptor.getName().getBytes(), blockMap);
+ }
+ }
+
+ /**
+ * The ColumnOrientedFile captures the mapping of the concept of
+ * a file to a row in Cassandra. Specifically, it considers each column in the
+ * row as a block in the file. Furthermore, it uses one of the columns to hold
+ * the {@link FileDescriptor} for the file, in the form of a JSON string
+ * (which serves to make the "file" readable by other, potentially disparate,
+ * clients).
+ *
+ *
+ * This class in turn relies on the {@link CassandraClient} for all low-level
+ * gets and puts to the Cassandra server. More importantly, it does not
+ * require that the {@link CassandraClient} be familiar with the notion of
+ * Lucene files. Rather, it transparently translates those notions to rows
+ * within the column family denoting the directory. In so doing, it ends up
+ * hiding the Cassandra layer from its consumers.
+ *
+ */
+ public class ColumnOrientedFile {
+ /**
+ * Write the given blocks in the file referenced by the given descriptor.
+ *
+ * @param fileDescriptor
+ * the descriptor of the file being written to
+ * @param blocksToBeWritten
+ * the map of block names to values
+ * @throws IOException
+ */
+ public void writeFileBlocks(FileDescriptor fileDescriptor,
+ BlockMap blocksToBeWritten) throws IOException {
+ System.out.println("The file descriptor saved was "
+ + FileDescriptorUtils.toJSON(fileDescriptor));
+ blocksToBeWritten.put(descriptorColumn, FileDescriptorUtils
+ .toString(fileDescriptor));
+ cassandraClient.setColumns(fileDescriptor.getName().getBytes(),
+ blocksToBeWritten);
+ }
+
+ /**
+ * Read the given blocks from the fiel referenced by the given descriptor.
+ *
+ * @param fileDescriptor
+ * the descriptor of the file being read
+ * @param blockNames
+ * the (unique) set of block names to read from
+ * @return the map of block names to values
+ * @throws IOException
+ */
+ public BlockMap readFileBlocks(FileDescriptor fileDescriptor,
+ Set blockNames) throws IOException {
+ Map columns = cassandraClient.getColumns(fileDescriptor
+ .getName().getBytes(), blockNames);
+ BlockMap blockMap = new BlockMap();
+ blockMap.putAll(columns);
+ return blockMap;
+ }
+
+ }
+
+ /**
+ * The CassandraIndexOutput acts as an output stream for a
+ * Cassandra-based file of a given name. In essense, it allows Lucene's
+ * low-level data types to be written into that file, using a write-behind
+ * caching mechanism.
+ *
+ *
+ * Specifically, it builds upon the write-behind cache implemented by the
+ * {@link BufferedIndexOutput}, by essentially mapping one or more blocks into
+ * the underlying buffer. Ergo, it is recommended that the buffer size be a
+ * exact multiple of the block size, as that will help to reduce the number of
+ * round trips to the Cassandra server.
+ *
+ */
+ public class CassandraIndexOutput extends BufferedIndexOutput {
+ // The file descriptor corresponding to this file.
+ FileDescriptor fileDescriptor;
+
+ // A map that keeps track of (partial block) fragments. While this is
+ // currently not being used, it could be used to de-fragment the file as
+ // subsequence writes occur.
+ BlockMap fragments;
+
+ // The file block where the file pointer is currently positioned.
+ FileBlock currentBlock;
+
+ // A flag indicating whether or not this stream is open.
+ private volatile boolean isOpen;
+
+ protected ColumnOrientedFile columnOrientedFile;
+
+ public CassandraIndexOutput(String fileName, int bufferSize)
+ throws IOException {
+ super(bufferSize);
+ fileDescriptor = columnOrientedDirectory
+ .getFileDescriptor(fileName, true);
+ currentBlock = fileDescriptor.getFirstBlock();
+ isOpen = true;
+ columnOrientedFile = new ColumnOrientedFile();
+ }
+
+ /**
+ * Only close the file if it has not been closed yet.
+ */
+ @Override
+ public void close() throws IOException {
+ if (isOpen) {
+ try {
+ super.close();
+ } finally {
+ isOpen = false;
+ fragments.clear();
+ }
+ }
+ }
+
+ /**
+ * @return the current length of the file
+ */
+ @Override
+ public long length() throws IOException {
+ return fileDescriptor.getLength();
+ }
+
+ /**
+ * Seek to the block corresponding to the given file position, and then move
+ * to the exact data position within that block.
+ */
+ @Override
+ public void seek(long position) throws IOException {
+ currentBlock = FileDescriptorUtils.seekBlock(fileDescriptor, position);
+ }
+
+ /**
+ * Map the bytes (that were in the underlying buffer) to their corresponding
+ * file blocks based on the current file pointer, and write the blocks
+ * through to the column family in the Cassandra server.
+ */
+ @Override
+ protected void flushBuffer(byte[] bytes, int offset, int length)
+ throws IOException {
+ if (length == 0) {
+ return;
+ }
+
+ BlockMap blocksToFlush = new BlockMap();
+
+ if (currentBlock.getDataPosition() > 0) {
+ FileBlock preFragment = (FileBlock) currentBlock.clone();
+ preFragment.setDataLength(currentBlock.getDataPosition());
+ fileDescriptor.insertBlock(currentBlock, preFragment, false);
+ }
+
+ int bytesLeftToWrite = length;
+ int bytesAddedByWrite = 0;
+ do {
+ maybeRepositionCurrentBlock();
+ int dataLength = (int) Math.min(currentBlock.getBlockSize()
+ - currentBlock.getPositionOffset(), bytesLeftToWrite);
+ int currentLength = currentBlock.getDataLength();
+ FileBlock nextBlock;
+ if (currentBlock.getDataPosition() == 0
+ && dataLength > currentBlock.getDataLength()) {
+ nextBlock = currentBlock;
+ nextBlock.setDataLength(dataLength);
+ } else {
+ nextBlock = fileDescriptor.createBlock();
+ nextBlock.setDataLength(dataLength);
+ nextBlock.setDataOffset(currentBlock.getPositionOffset());
+ }
+ byte[] partialBytes = new byte[dataLength];
+ System.arraycopy(bytes, offset, partialBytes, 0, dataLength);
+ blocksToFlush.put(nextBlock.getBlockName(), partialBytes);
+ nextBlock.setDataPosition(dataLength);
+ if (nextBlock != currentBlock) {
+ FileBlock blockToBeRemoved;
+ if (nextBlock.getDataPosition() > 0) {
+ blockToBeRemoved = currentBlock;
+ fileDescriptor.insertBlock(currentBlock, nextBlock, true);
+ } else {
+ blockToBeRemoved = currentBlock;
+ fileDescriptor.insertBlock(currentBlock, nextBlock, false);
+ }
+ for (; blockToBeRemoved != null
+ && blockToBeRemoved.getLastDataOffset() < nextBlock
+ .getLastDataOffset(); blockToBeRemoved = fileDescriptor
+ .getNextBlock(blockToBeRemoved)) {
+ fileDescriptor.removeBlock(blockToBeRemoved);
+ }
+ }
+ bytesLeftToWrite -= dataLength;
+ offset += dataLength;
+ if (fileDescriptor.isLastBlock(nextBlock)) {
+ if (nextBlock != currentBlock) {
+ bytesAddedByWrite += dataLength;
+ } else {
+ bytesAddedByWrite += dataLength - currentLength;
+ }
+ }
+ currentBlock = nextBlock;
+ } while (bytesLeftToWrite > 0);
+
+ if (currentBlock.getDataPosition() < currentBlock.getDataLength()) {
+ FileBlock postFragment = (FileBlock) currentBlock.clone();
+ postFragment.setDataOffset(currentBlock.getPositionOffset());
+ postFragment
+ .setDataLength((int) (currentBlock.getDataLength() - postFragment
+ .getDataOffset()));
+
+ fileDescriptor.insertBlock(currentBlock, postFragment, true);
+ currentBlock = postFragment;
+ currentBlock.setBlockOffset(currentBlock.getBlockOffset()
+ + currentBlock.getDataPosition());
+ currentBlock.setDataPosition(0);
+ }
+
+ maybeRepositionCurrentBlock();
+
+ long now = new Date().getTime();
+ if (bytesAddedByWrite > 0) {
+ fileDescriptor
+ .setLength(fileDescriptor.getLength() + bytesAddedByWrite);
+ }
+ fileDescriptor.setLastAccessed(now);
+ fileDescriptor.setLastModified(now);
+ columnOrientedFile.writeFileBlocks(fileDescriptor, blocksToFlush);
+ }
+
+ /**
+ * In the event the file pointer is currently positioned at the exact end of
+ * the data range in the current block, then reposition to the first byte in
+ * the ensuing block. Furthermore, if there is no ensuing block, then create
+ * a brand-new block, append it to the end of the file, and move into that
+ * new block.
+ */
+ protected void maybeRepositionCurrentBlock() {
+ if (currentBlock.getDataPosition() == currentBlock.getDataLength()
+ && currentBlock.getPositionOffset() == currentBlock.getBlockSize()) {
+ FileBlock nextBlock = fileDescriptor.getNextBlock(currentBlock);
+ if (nextBlock == null) {
+ nextBlock = fileDescriptor.createBlock();
+ fileDescriptor.insertBlock(currentBlock, nextBlock, true);
+ }
+ currentBlock = nextBlock;
+ }
+ }
+ }
+
+ /**
+ * The CassandraIndexInput acts as a input stream for a
+ * Cassandra-based file of a given name. In essense, it allows Lucene's
+ * low-level data types to be read from that file, using a read-ahead caching
+ * mechanism.
+ *
+ *
+ * Specifically, it builds upon the read-ahead cache implemented by the
+ * {@link BufferedIndexInput}, by essentially mapping one or more blocks into
+ * the underlying buffer. Ergo, it is recommended that the buffer size be a
+ * exact multiple of the block size, as that will help to reduce the number of
+ * round trips to the Cassandra server.
+ *
+ */
+ public class CassandraIndexInput extends BufferedIndexInput {
+ // A reference to the descriptor for the file being read from.
+ protected final FileDescriptor fileDescriptor;
+
+ // A reference to the current block being read from.
+ protected FileBlock currentBlock;
+
+ // The length of the file as determined when it was opened.
+ protected long fileLength;
+
+ protected ColumnOrientedFile columnOrientedFile;
+
+ /**
+ * Construct a type of {@link IndexInput} that understands how to read from
+ * the Cassandra-based file of the given name. It uses a read-ahead buffer
+ * under the covers whose size is specified by the given buffer size.
+ *
+ * @param fileName
+ * the name of the file to read
+ * @param bufferSize
+ * the size of the input buffer
+ * @throws IOException
+ */
+ public CassandraIndexInput(String fileName, int bufferSize)
+ throws IOException {
+ super(bufferSize);
+ fileDescriptor = columnOrientedDirectory.getFileDescriptor(fileName);
+ if (fileDescriptor == null) {
+ throw new IOException("Unable to locate file " + fileName);
+ }
+ currentBlock = fileDescriptor.getFirstBlock();
+ fileLength = fileDescriptor.getLength();
+ columnOrientedFile = new ColumnOrientedFile();
+ }
+
+ /**
+ * There's nothing to close.
+ */
+ @Override
+ public void close() throws IOException {}
+
+ /**
+ * @return the cached value of the file length
+ */
+ @Override
+ public long length() {
+ return fileLength;
+ }
+
+ /**
+ * Read the given number (i.e., length) of bytes, by first determining which
+ * blocks to retrieve, and then copying over the data from each of those
+ * blocks into the given byte array starting at the gien offset, one block
+ * at a time.
+ *
+ * @param bytes
+ * the array to read bytes into
+ * @param offset
+ * the offset in the array to start storing bytes
+ * @param length
+ * the number of bytes to read
+ */
+ @Override
+ protected void readInternal(byte[] bytes, int offset, int length)
+ throws IOException {
+ Set blockNames = new TreeSet(BYTE_ARRAY_COMPARATOR);
+ List blocksToBeRead = new ArrayList();
+
+ int bytesToBeRead = length;
+ do {
+ byte[] columnName = currentBlock.getBlockName().getBytes();
+ if (!blockNames.contains(columnName)) {
+ blockNames.add(columnName);
+ }
+ blocksToBeRead.add(currentBlock);
+ FileBlock nextBlock = fileDescriptor.getNextBlock(currentBlock);
+ if (nextBlock == null) {
+ break;
+ }
+ bytesToBeRead -= currentBlock.getDataLength();
+ currentBlock = nextBlock;
+ } while (bytesToBeRead > 0);
+
+ BlockMap blockMap = columnOrientedFile.readFileBlocks(fileDescriptor,
+ blockNames);
+ bytesToBeRead = length;
+ for (FileBlock blockToBeRead : blocksToBeRead) {
+ for (Map.Entry columnEntry : blockMap.entrySet()) {
+ String columnName = new String(columnEntry.getKey());
+ byte[] columnValue = columnEntry.getValue();
+ if (columnName.equals(blockToBeRead.getBlockName())) {
+ int bytesToReadFromBlock = (int) Math.min(bytesToBeRead,
+ (blockToBeRead.getDataLength() + blockToBeRead
+ .getDataPosition()));
+ System.arraycopy(columnValue, 0, bytes, offset,
+ bytesToReadFromBlock);
+ bytesToBeRead -= bytesToReadFromBlock;
+ offset += bytesToReadFromBlock;
+ blockToBeRead.setDataPosition(blockToBeRead.getDataPosition()
+ + bytesToReadFromBlock);
+ }
+ }
+ }
+
+ if (currentBlock.getDataPosition() == currentBlock.getDataLength()) {
+ FileBlock nextBlock = fileDescriptor.getNextBlock(currentBlock);
+ if (nextBlock != null) {
+ currentBlock = nextBlock;
+ }
+ }
+ }
+
+ /**
+ * Seek to the block corresponding to the given file position, and then move
+ * to the exact data position within that block.
+ */
+ @Override
+ protected void seekInternal(long position) throws IOException {
+ currentBlock = FileDescriptorUtils.seekBlock(fileDescriptor, position);
+ }
+ }
+
+ /**
+ * The CassandraClient encapsulates the low-level interactions of
+ * the directory with the (remote) Cassandra server. In particular, it
+ * delegates the request through to a Thrift client, which may or may not be
+ * framed.
+ *
+ *
+ * Note that this class is not aware of the notions of directories and files
+ * described above. Instead, it simply provides basic operations, such as
+ * those that get/set columns/keys. This separation of concern of the concepts
+ * of Cassandra and Lucene serves to not only keep the design simple, but also
+ * readable (hopefully).
+ *
+ */
+ public class CassandraClient {
+ // The underlying thrift client to delegate requests to.
+ protected Cassandra.Client thriftClient;
+
+ /**
+ * Construct a Cassandra client that knows how to get/set rows/columns from
+ * the given keyspace and column family, residing in the given Cassandra
+ * server.
+ *
+ * @param host
+ * the host where the Cassandra Thrift server is located
+ * @param port
+ * the port where the Cassandra Thrift server is listening
+ * @param framed
+ * a flag indicating whether or not to use a framed transport
+ * @throws IOException
+ */
+ public CassandraClient(String host, int port, boolean framed)
+ throws IOException {
+ TSocket socket = new TSocket(host, port);
+ TTransport transport = framed ? new TFramedTransport(socket) : socket;
+
+ try {
+ transport.open();
+ thriftClient = new Cassandra.Client(new TBinaryProtocol(transport));
+ Map credentials = new HashMap();
+ Set keyspaces = thriftClient.describe_keyspaces();
+ if (!keyspaces.contains(keyspace)) {
+ List cfDefs = new ArrayList();
+ cfDefs.add(new CfDef(keyspace, columnFamily));
+ thriftClient.system_add_keyspace(new KsDef(keyspace,
+ "org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefs));
+ }
+ thriftClient.set_keyspace(keyspace);
+ try {
+ CfDef cfDef = new CfDef(keyspace, columnFamily);
+ thriftClient.system_add_column_family(cfDef);
+ } catch (InvalidRequestException e) {}
+ thriftClient.login(new AuthenticationRequest(credentials));
+ } catch (Exception e) {
+ throw new IOException("Unable to open connection to keyspace "
+ + keyspace, e);
+ }
+ }
+
+ /**
+ * Return the keys that define the given column names.
+ *
+ * @param columnNames
+ * the names of the columns
+ * @return the rows that contain those columns
+ * @throws IOException
+ */
+ public byte[][] getKeys(List columnNames) throws IOException {
+ try {
+ List keySlices = thriftClient.get_range_slices(
+ new ColumnParent().setColumn_family(columnFamily),
+ new SlicePredicate().setColumn_names(columnNames), new KeyRange()
+ .setStart_key("".getBytes()).setEnd_key("".getBytes()),
+ ConsistencyLevel.ONE);
+ List keys = new ArrayList();
+ for (KeySlice keySlice : keySlices) {
+ List coscs = keySlice.getColumns();
+ if (coscs != null && coscs.size() == 1) {
+ ColumnOrSuperColumn cosc = coscs.get(0);
+ Column column = cosc.getColumn();
+ FileDescriptor fileDescriptor = FileDescriptorUtils
+ .fromBytes(column.getValue());
+ if (fileDescriptor == null || fileDescriptor.isDeleted()) {
+ continue;
+ }
+ keys.add(keySlice.key);
+ }
+ }
+ return keys.toArray(new byte[][] {});
+ } catch (Exception e) {
+ throw new IOException("Unable to list all files in " + keyspace);
+ }
+ }
+
+ /**
+ * Get the given set of columns for the row specified by the given key.
+ *
+ * @param key
+ * the key to the row to read from
+ * @param columnNames
+ * the names of the columns to fetch
+ * @return the values for those columns in that row
+ * @throws IOException
+ */
+ public Map getColumns(byte[] key, Set columnNames)
+ throws IOException {
+ try {
+ List uniqueColumnNames = new ArrayList();
+ uniqueColumnNames.addAll(columnNames);
+ List coscs = thriftClient.get_slice(key,
+ new ColumnParent(columnFamily), new SlicePredicate()
+ .setColumn_names(uniqueColumnNames), ConsistencyLevel.ONE);
+ Map columns = new HashMap();
+ for (ColumnOrSuperColumn cosc : coscs) {
+ Column column = cosc.getColumn();
+ columns.put(column.getName(), column.getValue());
+ }
+ return columns;
+ } catch (Exception e) {
+ throw new IOException("Could not read from columns for file " + key, e);
+ }
+ }
+
+ /**
+ * Get the given column for the row specified by the given key.
+ *
+ * @param key
+ * the key to the row to read from
+ * @param columnName
+ * the name of the column to fetch
+ * @return the value for that column in this row
+ * @throws IOException
+ */
+ public byte[] getColumn(byte[] fileName, byte[] columnName)
+ throws IOException {
+ try {
+ List columnNames = new ArrayList();
+ columnNames.add(columnName);
+ List coscs = thriftClient.get_slice(fileName,
+ new ColumnParent().setColumn_family(columnFamily),
+ new SlicePredicate().setColumn_names(columnNames),
+ ConsistencyLevel.ONE);
+ if (!coscs.isEmpty()) {
+ ColumnOrSuperColumn cosc = coscs.get(0);
+ Column column = cosc.getColumn();
+ return column.getValue();
+ }
+ return null;
+ } catch (Exception e) {
+ throw new IOException("Unable to read file descriptor for " + fileName,
+ e);
+ }
+ }
+
+ /**
+ * Set the values for the given columns in the given row.
+ *
+ * @param key
+ * the key to the row being written to
+ * @param columnValues
+ * the values for the columns being updated
+ * @throws IOException
+ */
+ protected void setColumns(byte[] key, Map columnValues)
+ throws IOException {
+ Map>> mutationMap = new HashMap>>();
+
+ Map> cfMutation = new HashMap>();
+ mutationMap.put(key, cfMutation);
+
+ List mutationList = new ArrayList();
+ cfMutation.put(columnFamily, mutationList);
+
+ if (columnValues == null || columnValues.size() == 0) {
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion(System.currentTimeMillis());
+ /**
+ * Currently, we cannot delete rows from a column family. This issue is
+ * being tracked at https://issues.apache.org/jira/browse/CASSANDRA-293.
+ * When that issue that resolved, we may at that time choose to revive
+ * the code shown below.
+ *
+ * deletion.setPredicate(new SlicePredicate().setSlice_range(new
+ * SliceRange(new byte[] {}, new byte[] {}, false, Integer.MAX_VALUE)));
+ */
+ mutation.setDeletion(deletion);
+ mutationList.add(mutation);
+
+ } else {
+ for (Map.Entry columnValue : columnValues.entrySet()) {
+ Mutation mutation = new Mutation();
+ byte[] column = columnValue.getKey(), value = columnValue.getValue();
+ if (value == null) {
+
+ Deletion deletion = new Deletion(System.currentTimeMillis());
+
+ if (column != null) {
+ deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays
+ .asList(new byte[][] {column})));
+ } else {
+ deletion.setPredicate(new SlicePredicate()
+ .setSlice_range(new SliceRange(new byte[] {}, new byte[] {},
+ false, Integer.MAX_VALUE)));
+ }
+
+ mutation.setDeletion(deletion);
+
+ } else {
+
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc
+ .setColumn(new Column(column, value, System.currentTimeMillis()));
+
+ mutation.setColumn_or_supercolumn(cosc);
+ }
+
+ mutationList.add(mutation);
+ }
+ }
+ try {
+ thriftClient.batch_mutate(mutationMap, ConsistencyLevel.ONE);
+ } catch (Exception e) {
+ throw new IOException("Unable to mutate columns for file " + key, e);
+ }
+ }
+ }
+}
Index: contrib/directories/src/test/org/apache/lucene/store/TestCassandraDirectory.java
===================================================================
--- contrib/directories/src/test/org/apache/lucene/store/TestCassandraDirectory.java (revision 0)
+++ contrib/directories/src/test/org/apache/lucene/store/TestCassandraDirectory.java (revision 0)
@@ -0,0 +1,324 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestCassandraDirectory extends LuceneTestCase {
+
+ CassandraDirectory cassandraDirectory;
+
+ public void setUp() throws IOException {
+ cassandraDirectory = new CassandraDirectory("lucene1", "index", 10, 10);
+ for (String fileName : cassandraDirectory.listAll()) {
+ cassandraDirectory.deleteFile(fileName);
+ }
+ cassandraDirectory.createOutput("sampleFile");
+ }
+
+ public void tearDown() throws IOException {
+ if (cassandraDirectory != null) {
+ cassandraDirectory.close();
+ }
+ }
+
+ public void testListAll() throws IOException {
+ String[] files = cassandraDirectory.listAll();
+ assertEquals(1, files.length);
+ assertEquals("sampleFile", files[0]);
+ }
+
+ public void testFileExists() throws IOException {
+ assertTrue("The sample file should've been found, but it wasn't.",
+ cassandraDirectory.fileExists("sampleFile"));
+ try {
+ cassandraDirectory.fileExists("dummyFile");
+ assertTrue("The dummy file should not've been found, but it was.", true);
+ } catch (IOException e) {}
+ }
+
+ public void testFileModified() throws IOException {
+ long fileModified = cassandraDirectory.fileModified("sampleFile");
+ long currentTime = new Date().getTime();
+ long secondsSinceFileModified = (currentTime - fileModified) / 1000;
+ assertTrue(
+ "The sample file should have been modified just now, but it wasn't.",
+ secondsSinceFileModified >= 0 && secondsSinceFileModified < 1);
+ }
+
+ public void testTouchFile() throws IOException, InterruptedException {
+ long fileModified = cassandraDirectory.fileModified("sampleFile");
+ TimeUnit.SECONDS.sleep(3);
+ cassandraDirectory.touchFile("sampleFile");
+ long fileTouched = cassandraDirectory.fileModified("sampleFile");
+ long currentTime = new Date().getTime();
+ long secondsFileUnmodified = (fileTouched - fileModified) / 1000;
+ assertTrue(
+ "The sample file was not quiet for 3 seconds before it was touched.",
+ secondsFileUnmodified >= 2 && secondsFileUnmodified <= 3);
+ long secondsSinceFileTouched = (currentTime - fileTouched) / 1000;
+ assertTrue(
+ "The sample file should'be been touched just now, but it wasn't.",
+ secondsSinceFileTouched >= 0 && secondsSinceFileTouched < 1);
+
+ }
+
+ public void testDeleteFile() throws IOException {
+ cassandraDirectory.deleteFile("sampleFile");
+ assertTrue("The sample file should not've been found, but it was.",
+ !cassandraDirectory.fileExists("sampleFile"));
+ }
+
+ public void testFileLength() throws IOException {
+ assertEquals("The sample file's length should be zero.", cassandraDirectory
+ .fileLength("sampleFile"), 0);
+ }
+
+ public void testCreateFile() throws IOException {
+ for (int fileIndex = 0; fileIndex < 10; fileIndex++) {
+ String testFileName = "testFile" + fileIndex;
+ cassandraDirectory.createOutput(testFileName);
+ cassandraDirectory.fileExists(testFileName);
+ assertEquals("The test file's length should be zero.", cassandraDirectory
+ .fileLength(testFileName), 0);
+ }
+ }
+
+ public void testWriteFile() throws IOException {
+ String smallerThanABlock = "0123";
+ String exactlyABLock = "0123456789";
+ String largerThanABLock = "0123456789A";
+
+ String[] dataSample = new String[] {smallerThanABlock, exactlyABLock,
+ largerThanABLock};
+
+ for (String dataPoint : dataSample) {
+ writeStrings(new String[] {dataPoint});
+ }
+
+ for (String dataPoint : dataSample) {
+ List stringsToBeWritten = new ArrayList();
+ stringsToBeWritten.addAll(Arrays.asList(dataSample));
+ stringsToBeWritten.remove(dataPoint);
+ writeStrings(stringsToBeWritten.toArray(new String[] {}));
+ }
+
+ PermutationGenerator kPermutation = new PermutationGenerator(
+ dataSample.length);
+ while (kPermutation.hasMore()) {
+ int[] indices = kPermutation.getNext();
+ String[] stringsToBeWritten = new String[3];
+ for (int i = 0; i < indices.length; i++) {
+ stringsToBeWritten[i] = dataSample[indices[i]];
+ }
+ writeStrings(dataSample);
+ }
+ }
+
+ public void testReadFile() throws IOException {
+ String smallerThanABlock = "0123";
+ String exactlyABLock = "0123456789";
+ String largerThanABLock = "0123456789A";
+
+ String[] dataSample = new String[] {smallerThanABlock, exactlyABLock,
+ largerThanABLock};
+
+ for (String dataPoint : dataSample) {
+ readStrings(writeStrings(new String[] {dataPoint}));
+ }
+
+ for (String dataPoint : dataSample) {
+ List stringsToBeWritten = new ArrayList();
+ stringsToBeWritten.addAll(Arrays.asList(dataSample));
+ stringsToBeWritten.remove(dataPoint);
+ readStrings(writeStrings(stringsToBeWritten.toArray(new String[] {})));
+ }
+
+ PermutationGenerator kPermutation = new PermutationGenerator(
+ dataSample.length);
+ while (kPermutation.hasMore()) {
+ int[] indices = kPermutation.getNext();
+ String[] stringsToBeWritten = new String[3];
+ for (int i = 0; i < indices.length; i++) {
+ stringsToBeWritten[i] = dataSample[indices[i]];
+ }
+ readStrings(writeStrings(dataSample));
+ }
+ }
+
+ protected String[] writeStrings(String[] dataSample) throws IOException {
+ cassandraDirectory.deleteFile("sampleFile");
+ IndexOutput indexOutput = cassandraDirectory.createOutput("sampleFile");
+ int dataLength = 0;
+ for (String dataPoint : dataSample) {
+ indexOutput.writeString(dataPoint);
+ dataLength += dataPoint.length();
+ }
+ indexOutput.flush();
+ assertEquals("The index output's current file length is incorrect.",
+ dataLength + dataSample.length, indexOutput.length());
+ return dataSample;
+ }
+
+ protected void readStrings(String[] dataSample) throws IOException {
+ IndexInput indexInput = cassandraDirectory.openInput("sampleFile");
+ int dataLength = 0;
+ for (String expectedDataPoint : dataSample) {
+ String actualDataPoint = indexInput.readString();
+ assertEquals("The index input's next string did not match.",
+ expectedDataPoint, actualDataPoint);
+ dataLength += actualDataPoint.length();
+ }
+ assertEquals("The index output's current file length is incorrect.",
+ dataLength + dataSample.length, indexInput.length());
+ }
+
+ public static class PermutationGenerator {
+
+ private int[] a;
+ private BigInteger numLeft;
+ private BigInteger total;
+
+ // -----------------------------------------------------------
+ // Constructor. WARNING: Don't make n too large.
+ // Recall that the number of permutations is n!
+ // which can be very large, even when n is as small as 20 --
+ // 20! = 2,432,902,008,176,640,000 and
+ // 21! is too big to fit into a Java long, which is
+ // why we use BigInteger instead.
+ // ----------------------------------------------------------
+
+ public PermutationGenerator(int n) {
+ if (n < 1) {
+ throw new IllegalArgumentException("Min 1");
+ }
+ a = new int[n];
+ total = getFactorial(n);
+ reset();
+ }
+
+ // ------
+ // Reset
+ // ------
+
+ public void reset() {
+ for (int i = 0; i < a.length; i++) {
+ a[i] = i;
+ }
+ numLeft = new BigInteger(total.toString());
+ }
+
+ // ------------------------------------------------
+ // Return number of permutations not yet generated
+ // ------------------------------------------------
+
+ public BigInteger getNumLeft() {
+ return numLeft;
+ }
+
+ // ------------------------------------
+ // Return total number of permutations
+ // ------------------------------------
+
+ public BigInteger getTotal() {
+ return total;
+ }
+
+ // -----------------------------
+ // Are there more permutations?
+ // -----------------------------
+
+ public boolean hasMore() {
+ return numLeft.compareTo(BigInteger.ZERO) == 1;
+ }
+
+ // ------------------
+ // Compute factorial
+ // ------------------
+
+ private static BigInteger getFactorial(int n) {
+ BigInteger fact = BigInteger.ONE;
+ for (int i = n; i > 1; i--) {
+ fact = fact.multiply(new BigInteger(Integer.toString(i)));
+ }
+ return fact;
+ }
+
+ // --------------------------------------------------------
+ // Generate next permutation (algorithm from Rosen p. 284)
+ // --------------------------------------------------------
+
+ public int[] getNext() {
+
+ if (numLeft.equals(total)) {
+ numLeft = numLeft.subtract(BigInteger.ONE);
+ return a;
+ }
+
+ int temp;
+
+ // Find largest index j with a[j] < a[j+1]
+
+ int j = a.length - 2;
+ while (a[j] > a[j + 1]) {
+ j--;
+ }
+
+ // Find index k such that a[k] is smallest integer
+ // greater than a[j] to the right of a[j]
+
+ int k = a.length - 1;
+ while (a[j] > a[k]) {
+ k--;
+ }
+
+ // Interchange a[j] and a[k]
+
+ temp = a[k];
+ a[k] = a[j];
+ a[j] = temp;
+
+ // Put tail end of permutation after jth position in increasing order
+
+ int r = a.length - 1;
+ int s = j + 1;
+
+ while (r > s) {
+ temp = a[s];
+ a[s] = a[r];
+ a[r] = temp;
+ r--;
+ s++;
+ }
+
+ numLeft = numLeft.subtract(BigInteger.ONE);
+ return a;
+
+ }
+
+ }
+
+}