Index: contrib/directories/src/java/org/apache/lucene/store/CassandraDirectory.java =================================================================== --- contrib/directories/src/java/org/apache/lucene/store/CassandraDirectory.java (revision 0) +++ contrib/directories/src/java/org/apache/lucene/store/CassandraDirectory.java (revision 0) @@ -0,0 +1,1951 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.cassandra.thrift.AuthenticationRequest; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.CfDef; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.Deletion; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; + +/** + * The CassandraDirectory maps the concept of a Lucene directory to + * a column family that belongs to a certain keyspace located in a given + * Cassandra server. Furthermore, it stores each file under this directory as a + * row in that column family. + * + *

+ * In particular, files are broken down into blocks (whose sizes are capped), + * where each block (see {@link FileBlock}) is stored as the value of a column + * in the corresponding row. As per + * http://wiki.apache.org/cassandra/CassandraLimitations, this is the + * recommended approach for dealing with large objects, which Lucene files tend + * to be. Furthermore, a descriptor of the file (see {@link FileDescriptor}) + * that outlines a map of blocks therein is stored as one of the columns in that + * row as well. Think of this descriptor as an inode for Cassandra-based files. + *

+ * + *

+ * The exhaustive mapping of a Lucene directory (file) to a Cassandra column + * family (row) is captured in the {@link ColumnOrientedDirectory} ( + * {@link ColumnOrientedFile}) inner-class. Specifically, they interpret + * Cassandra's data model in terms of that of Lucene. More importantly, these + * are the only two inner-classes that have a foot in both the Lucene and + * Cassandra camps. + *

+ * + *

+ * All writes to a file in this directory occur through a + * {@link CassandraIndexOutput}, which puts the data flushed from a write-behind + * buffer into the fitting set of blocks. By the same token, all reads from a + * file in this directory occur through a {@link CassandraIndexInput}, which + * gets the data needed by a read-ahead buffer from the right set of blocks. + *

+ * + *

+ * The last (but not the least) inner-class, {@link CassandraClient}, acts as a + * facade for a Thrift-based Cassandra client. In short, it provides operations + * to get/put rows/columns in the column family and keyspace associated with + * this directory. + *

+ * + *

+ * Unlike Lucandra, which attempts to bridge the gap between Lucene and + * Cassandra at the document-level, the {@link CassandraDirectory} is + * self-sufficient in the sense that it does not require a re-write of other + * components in the Lucene stack. In other words, one may use the + * {@link CassandraDirectory} in conjunction with the {@link IndexWriter} and + * {@link IndexReader}, as you would any other kind of Lucene {@link Directory}. + * Moreover, given the the data unit that is transferred to and from Cassandra + * is a large-sized block, one may expect fewer round trips, and hence better + * throughputs, from the {@link CassandraDirectory}. + *

+ * + *

+ * In conclusion, this directory attempts to marry the rich search-based query + * language of Lucene with the distributed fault-tolerant database that is + * Cassandra. By delegating the responsibilities of replication, durability and + * elasticity to the directory, we free the layers above from such + * non-functional concerns. Our hope is that users will choose to make their + * large-scale indices instantly scalable by seamlessly migrating them to this + * type of directory (using {@link Directory#copyTo(Directory)}). + *

+ * + * @author Karthick Sankarachary + */ +public class CassandraDirectory extends Directory { + + // The default size of a block, which in turn maps to a cassandra column. + public static final int DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024; + + // The default size of the buffer, which is managed by the index output. + public static final int DEFAULT_BUFFER_SIZE = 1 * DEFAULT_BLOCK_SIZE; + + // The default host where the cassandra server is located. + public static final String DEFAULT_CASSANDRA_HOST = "localhost"; + + // The default port where the cassandra server is listening. + public static final int DEFAULT_CASSANDRA_PORT = 9160; + + // The default flag indicating whether the cassandra server is framed. + public static final boolean DEFAULT_CASSANDRA_FRAMED = false; + + // The default keyspace in which to store cassandra directories. + public static final String DEFAULT_CASSANDRA_KEYSPACE = "lucene"; + + // The name of every column that holds a file block starts with this prefix. + protected static final String BLOCK_COLUMN_NAME_PREFIX = "BLOCK-"; + + // The name of the column that holds the file descriptor. + protected static final String descriptorColumn = "DESCRIPTOR"; + + // The list of meta-columns currently defined for each file (or row). + protected static final List systemColumns = new ArrayList(); + static { + systemColumns.add(descriptorColumn.getBytes()); + } + + // The reference to the cassandra client that talks to the thrift server. + protected CassandraClient cassandraClient; + + // The keyspace in which to read/write cassandra directories and files. + protected String keyspace; + + // The name of the column family that maps to this cassandra directory. + protected String columnFamily; + + // The current size of the block to write out to a column. + protected int blockSize; + + // The current size of the buffer that (ideally) is big enough to hold one or + // more file blocks. In essence, the write (read) buffer acts as a + // write-behind (read-ahead) cache that performs a lazy write (read) only when + // the data is evicted (or flushed) from the cache. Given that, we should try + // to write and read data in block multiples. + protected int bufferSize; + + protected ColumnOrientedDirectory columnOrientedDirectory; + + /** + * Construct a Cassandra-based directory that maps to the given column family, + * which is located in the default keyspace. + * + * @param columnFamily + * the name of the column family that denotes this directory + * @throws IOException + */ + public CassandraDirectory(String columnFamily) throws IOException { + this(DEFAULT_CASSANDRA_KEYSPACE, columnFamily); + } + + /** + * Construct a Cassandra-based directory that maps to the given column family, + * which is located in the given keyspace. Note that it uses the default block + * and buffer sizes. + * + * @param keyspace + * the name of the keyspace in which to find the column family + * @param columnFamily + * the name of the column family that dentoes this directory + * @throws IOException + */ + public CassandraDirectory(String keyspace, String columnFamily) + throws IOException { + this(keyspace, columnFamily, DEFAULT_BLOCK_SIZE, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a Cassandra-based directory that maps to the given column family, + * which is located in the given keyspace. Moreover, it uses the given block + * size as the column size, and the given buffer size for the write-behind and + * read-ahead cache. + * + * @param keyspace + * the name of the keyspace in which to find the column family + * @param columnFamily + * the name of the column family that dentoes this directory + * @param blockSize + * the size of the file block + * @param bufferSize + * the size of the read/write buffer + * @throws IOException + */ + public CassandraDirectory(String keyspace, String columnFamily, + int blockSize, int bufferSize) throws IOException { + this(DEFAULT_CASSANDRA_HOST, DEFAULT_CASSANDRA_PORT, + DEFAULT_CASSANDRA_FRAMED, keyspace, columnFamily, blockSize, bufferSize); + } + + /** + * Construct a Cassandra-based directory that maps to the given column family, + * which is located in the given keyspace. In particular, this directory talks + * to the cassandra server running on the given host and port. + * + * @param host + * the host where the cassandra server is located + * @param port + * the port where the cassandra server is listening + * @param keyspace + * the name of the keyspace in which to find the column family + * @param columnFamily + * the name of the column family that dentoes this directory + * @throws IOException + */ + public CassandraDirectory(String host, Integer port, String keyspace, + String columnFamily) throws IOException { + this(host, port, DEFAULT_CASSANDRA_FRAMED, keyspace, columnFamily, + DEFAULT_BLOCK_SIZE, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a Cassandra-based directory that maps to the given column family, + * which is located in the given keyspace. In particular, this directory talks + * to the cassandra server running on the given host and port. + * + * @param host + * the host where the cassandra server is located + * @param port + * the port where the cassandra server is listening + * @param framed + * a flag to ensure a fully read message every time by preceeding + * messages with a 4-byte frame size + * @param keyspace + * the name of the keyspace in which to find the column family + * @param columnFamily + * the name of the column family that dentoes this directory + * @param blockSize + * the size of the file block + * @param bufferSize + * the size of the read/write buffer + * @throws IOException + */ + public CassandraDirectory(String host, Integer port, boolean framed, + String keyspace, String columnFamily, int blockSize, int bufferSize) + throws IOException { + this.keyspace = keyspace; + this.columnFamily = columnFamily; + this.blockSize = blockSize; + this.bufferSize = bufferSize; + this.cassandraClient = new CassandraClient(host, port, framed); + this.columnOrientedDirectory = new ColumnOrientedDirectory(); + } + + /** + * @return the name of the keyspace in which to find the column family + */ + public String getKeyspace() { + return keyspace; + } + + /** + * @return the name of the column family that represents this directory + */ + public String getColumnFamily() { + return columnFamily; + } + + /** + * @return the size of the file block + */ + public long getBlockSize() { + return blockSize; + } + + /** + * @return the size of the read/write buffer + */ + public long getBufferSize() { + return bufferSize; + } + + /** + * Creates a new, empty file in the directory with the given file name. + * + * @return a stream that writes into this file + */ + @Override + public IndexOutput createOutput(String fileName) throws IOException { + ensureOpen(); + return new CassandraIndexOutput(fileName, bufferSize); + } + + /** + * Open an existing file in the directory with the given file name. + * + * @return a stream that reads from an existing file. + */ + @Override + public IndexInput openInput(String fileName) throws IOException { + ensureOpen(); + return new CassandraIndexInput(fileName, bufferSize); + } + + /** + * Returns an array of strings, one for each (non-deleted) file in the + * directory. + * + * @throws NoSuchDirectoryException + * if the directory is not prepared for any write operations (such + * as {@link #createOutput(String)}). + * @throws IOException + * in case of other IO errors + */ + @Override + public String[] listAll() throws IOException { + ensureOpen(); + return columnOrientedDirectory.getFileNames(); + } + + /** + * @return true iff a file with the given name exists + */ + @Override + public boolean fileExists(String fileName) throws IOException { + ensureOpen(); + try { + return fileLength(fileName) >= 0; + } catch (IOException e) { + return false; + } + } + + /** + * Returns the length of a file in the directory. This method follows the + * following contract: + * + * + * @param name + * the name of the file for which to return the length. + * @throws FileNotFoundException + * if the file does not exist. + * @throws IOException + * if there was an IO error while retrieving the file's length. + */ + @Override + public long fileLength(String fileName) throws IOException { + ensureOpen(); + FileDescriptor descriptor = columnOrientedDirectory + .getFileDescriptor(fileName); + if (descriptor == null) { + throw new IOException("Could not find descriptor for file " + fileName); + } + return descriptor.getLength(); + } + + /** + * @return the time the named file was last modified + */ + @Override + public long fileModified(String fileName) throws IOException { + ensureOpen(); + FileDescriptor descriptor = columnOrientedDirectory + .getFileDescriptor(fileName); + if (descriptor == null) { + throw new IOException("Could not find descriptor for file " + fileName); + } + return descriptor.getLastModified(); + } + + /** + * Set the modified time of an existing file to now. + */ + @Override + public void touchFile(String fileName) throws IOException { + ensureOpen(); + try { + FileDescriptor fileDescriptor = columnOrientedDirectory + .getFileDescriptor(fileName); + fileDescriptor.setLastModified(System.currentTimeMillis()); + columnOrientedDirectory.setFileDescriptor(fileDescriptor); + } catch (Exception e) { + throw new IOException("Could not touch file " + fileName, e); + } + } + + /** + * Removes an existing file in the directory. + */ + @Override + public void deleteFile(String fileName) throws IOException { + ensureOpen(); + FileDescriptor fileDescriptor = columnOrientedDirectory + .getFileDescriptor(fileName); + if (fileDescriptor != null) { + fileDescriptor.setDeleted(true); + columnOrientedDirectory.setFileDescriptor(fileDescriptor); + } + } + + /** + * Closes all the client-side resources obtained by this directory instance. + */ + @Override + public void close() throws IOException { + isOpen = false; + } + + /** + * The FileDescriptor captures the meta-data of the file, a la + * Unix inodes (index nodes). In addition to the usual tidbits such as name, + * length and timestamps, it carries a flag (@see {@link #deleted}) that + * indicates whether the file has been deleted or not. + * + *

+ * The data in the file is indexed through a ordered list of {@link FileBlock} + * s, where each block denotes a contiguous portion of the file data. By + * walking through the blocks sequentially in the given order, one can + * retrieve the entire contents of the file. A psuedo-random access of the + * file can be effected by loading into memory the entire block to which the + * (random) file pointer maps, and then positioning the file pointer within + * the in-memory block. + *

+ */ + public static class FileDescriptor { + // The name of the file. + private String name; + + // The length of the file. + private long length; + + // A flag indicating whether the file has been deleted. Currently, we cannot + // delete rows from a column family, which means that it is not physically + // possible to delete a file. To workaround this issue, which is described + // in detail at https://issues.apache.org/jira/browse/CASSANDRA-293, we rely + // on this flag instead. Note that, as a side-effect of this design, we + // could potentially allow cassandra files to be "unremoved", if you will. + private boolean deleted; + + // The timestamp at which the file was last modified. + private long lastModified; + + // The timestamp at which the file was last accessed. + private long lastAccessed; + + // The maximum size of the block that may be stored in a column. + private long blockSize; + + // The ordered list of blocks in this file. + private LinkedList blocks; + + // The number to use for the next block that will be allocated. If it is + // uninitialized (i.e., -1), then it forces the descriptor to reset it to + // the highest number of any block in {@link #blocks}. + private int nextBlockNumber = -1; + + /** + * Construct a file descriptor for the given file name, using the default + * block size. + * + * @param fileName + * the name of the file + */ + public FileDescriptor(String fileName) { + this(fileName, DEFAULT_BLOCK_SIZE); + } + + /** + * Construct a file descriptor for the given file name and block size. + * + * @param fileName + * the name of the file + * @param blockSize + * the size of the block + */ + public FileDescriptor(String fileName, long blockSize) { + setName(fileName); + setLength(0); + Date now = new Date(); + setLastAccessed(now.getTime()); + setLastModified(now.getTime()); + setBlockSize(blockSize); + setBlocks(new LinkedList()); + } + + /** + * @return the name of the file + */ + public String getName() { + return name; + } + + /** + * Rename the file. + * + * @param name + * the name of the file + */ + public void setName(String name) { + this.name = name; + } + + /** + * @return the current length of the file + */ + public long getLength() { + return length; + } + + /** + * Resize the file. + * + * @param length + * the new length of the file + */ + public void setLength(long length) { + this.length = length; + } + + /** + * @return true iff the file has been deleted + */ + public boolean isDeleted() { + return deleted; + } + + /** + * Mark the file as deleted (or undeleted) based on whether the given flag + * is true (or not). + * + * @param deleted + * should the file be marked as deleted? + */ + public void setDeleted(boolean deleted) { + this.deleted = deleted; + } + + /** + * @return the timestamp at which the file was last modified + */ + public long getLastModified() { + return lastModified; + } + + /** + * Set the timestamp at which the file was last modified. + * + * @param lastModified + * the last modified timestamp + */ + public void setLastModified(long lastModified) { + this.lastModified = lastModified; + } + + /** + * @return the timestamp at which the file was last accessed + */ + public long getLastAccessed() { + return lastAccessed; + } + + /** + * Set the timestamp at which the file was last accessed. + * + * @param lastModified + * the last accessed timestamp + */ + public void setLastAccessed(long lastAccessed) { + this.lastAccessed = lastAccessed; + } + + /** + * @return the maximum size of the block that may be stored in a column + */ + public long getBlockSize() { + return blockSize; + } + + /** + * Set the maximum size of the block that may be stored in a column. + * + * @param blockSize + * the block size + */ + public void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + + /** + * @return the ordered list of file blocks + */ + public List getBlocks() { + return blocks; + } + + /** + * Set the ordered list of file blocks. + * + * @param blocks + * the ordered list of file blocks + */ + public void setBlocks(List blocks) { + if (LinkedList.class.isAssignableFrom(blocks.getClass())) { + this.blocks = (LinkedList) blocks; + } else { + this.blocks = new LinkedList(blocks); + } + } + + /** + * @return the first block in the file + */ + public FileBlock getFirstBlock() { + if (blocks.isEmpty()) { + blocks.add(createBlock()); + } + return blocks.getFirst(); + } + + /** + * @return the last block in the file + */ + public FileBlock getLastBlock() { + if (blocks.isEmpty()) { + blocks.add(createBlock()); + } + return blocks.getLast(); + } + + /** + * @return true iff the given block is the first one in the file + */ + public boolean isFirstBlock(FileBlock nextBlock) { + return getFirstBlock().equals(nextBlock); + } + + /** + * @return true iff the given block is the last one in the file + */ + public boolean isLastBlock(FileBlock nextBlock) { + return getLastBlock().equals(nextBlock); + } + + /** + * Return the block that logically follows the given block. + * + * @param block + * an existing file block + * @return the block that logically follows the given block + */ + public FileBlock getNextBlock(FileBlock block) { + int blockIndex = blocks.indexOf(block); + return (blockIndex != -1 && blockIndex < (blocks.size() - 1)) ? blocks + .get(blockIndex + 1) : null; + } + + /** + * Add the given block as the last block in the file. + * + * @param newBlock + * the block to be appended to the file + */ + public void addLastBlock(FileBlock newBlock) { + blocks.addLast(newBlock); + } + + /** + * Add the given block as the first block in the file. + * + * @param newBlock + * the block to be prepended to the file + */ + public void addFirstBlock(FileBlock newBlock) { + blocks.addFirst(newBlock); + } + + /** + * Insert a new block either after or before an existing block, based on + * whether or not the insertAfter flag is true. + * + * @param existingBlock + * an existing file block + * @param newBlock + * a new file block + * @param insertAfter + * whether or not to insert new block after the existing block + */ + public void insertBlock(FileBlock existingBlock, FileBlock newBlock, + boolean insertAfter) { + int existingIndex = blocks.indexOf(existingBlock); + if (existingIndex == -1) { + blocks.add(newBlock); + } else { + if (insertAfter) { + blocks.add(existingIndex + 1, newBlock); + } else { + blocks.add(existingIndex, newBlock); + } + } + } + + /** + * Replace an existing block with a new block. + * + * @param existingBlock + * an existing file block + * @param newBlock + * a new file block + */ + public void replaceBlock(FileBlock existingBlock, FileBlock newBlock) { + int existingIndex = blocks.indexOf(existingBlock); + if (existingIndex != -1) { + blocks.remove(existingIndex); + blocks.add(existingIndex, newBlock); + } + } + + /** + * Remove an existing block. + * + * @param existingBlock + * an existing block + * @return the index of the block just removed + */ + public int removeBlock(FileBlock existingBlock) { + int existingIndex = blocks.indexOf(existingBlock); + if (existingIndex != -1) { + blocks.remove(existingBlock); + } + return existingIndex; + } + + /** + * Create a file block with no data in it. The block number assigned to new + * blocks is set to auto-increment. + * + * @return the newly created file block + */ + public FileBlock createBlock() { + FileBlock newBlock = new FileBlock(); + newBlock.setBlockName(getNextBlockNumber()); + newBlock.setBlockSize(getBlockSize()); + newBlock.setDataLength(0); + return newBlock; + } + + /** + * Return the block number to use for the next block that is allocated. This + * number starts from 0 and will auto-increment with each block allocation. + * + * @return the next block number to use + */ + public int getNextBlockNumber() { + if (nextBlockNumber == -1) { + for (FileBlock fileBlock : blocks) { + nextBlockNumber = Math.max(nextBlockNumber, fileBlock + .getBlockNumber()); + } + } + return ++nextBlockNumber; + } + + } + + /** + * A FileBlock denotes a partition of the file whose size is + * capped by a certain limit (@see {@link #blockSize}). In particular, it + * refers to a contiguous sequence of bytes, which is stored as the value of a + * column in the column family that represents this directory. + * + *

+ * Ideally, each block in the file (except for maybe the last one) should + * contain as many bytes as {@link #blockSize}. However, at times, blocks tend + * to become fragmented in the sense that the {@link blockSize} bytes that was + * supposed to go in one (ideal) block gets spread across multiple blocks, + * each containing a portion of the ideal block. This may happen when writes + * occur after the file pointer is randomly moved across the file. While there + * are ways to avoid or at least mitigate the fragmentation issue, we + * currently deal with it as a part of life. In the event a block represents a + * fragment, it is important to know the offset of the fragment relative to + * the block where the (fragmented) block begins. This information is captured + * in {@link FileBlock#dataOffset}. + *

+ * + *

+ * There is some file block information that is not actually saved in + * cassandra, but rather calculated on the fly. For example, the + * {@link FileBlock#blockOffset} is used to keep track of the offset of the + * block relative to the file in. Similarly, the + * {@link FileBlock#dataPosition} notes where the file pointer is positioned + * relative to the block. Both of these fields are calculated after the file + * descriptor is loaded from Cassandra. + *

+ */ + public static class FileBlock implements Cloneable { + // The name of the file block. + private String blockName; + + // The number of the file block. + private int blockNumber; + + // The maximum size of the file block. + private long blockSize; + + // The offset of the first byte in this block relative to the file. + private long blockOffset; + + // The offset within the file block range where the first data byte appears. + private long dataOffset; + + // The length of the data starting from {@link #dataOffset} + private int dataLength; + + // The position of the file pointer relative to this block, assuming that + // pointer is currently inside this block to begin with. + private int dataPosition; + + /** + * Construct an empty file block. + */ + public FileBlock() {} + + /** + * @return the name of the file block + */ + public String getBlockName() { + return blockName; + } + + /** + * Set the name of the file block. + * + * @param blockName + * the name of the file block + */ + public void setBlockName(String blockName) { + this.blockName = blockName; + } + + /** + * Set the name of the file block based on the given block number. + * + * @param blockNumber + * the block number + */ + public void setBlockName(int blockNumber) { + this.blockNumber = blockNumber; + this.blockName = createBlockName(blockNumber); + } + + /** + * @return the number of this file block + */ + public int getBlockNumber() { + return blockNumber; + } + + /** + * Set the number of this file block. + * + * @param blockNumber + * the number for this file block + */ + public void setBlockNumber(int blockNumber) { + this.blockNumber = blockNumber; + } + + /** + * @return the maximum size of this file block + */ + public long getBlockSize() { + return blockSize; + } + + /** + * Set the maximum size of this file block. + * + * @param blockSize + * the maximum block size + */ + public void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + + /** + * @return the offset of the first byte in this block relative to the file + */ + public long getBlockOffset() { + return blockOffset; + } + + /** + * Set the offset of the first byte in this block relative to the file. + * + * @param blockOffset + * the new offset for this file block + */ + public void setBlockOffset(long blockOffset) { + this.blockOffset = blockOffset; + } + + /** + * @return the offset within the file block range where the first data byte + * appears + */ + public long getDataOffset() { + return dataOffset; + } + + /** + * Set the offset within the file block range where the first data byte + * appears. + * + * @param dataOffset + * the data offset + */ + public void setDataOffset(long dataOffset) { + this.dataOffset = dataOffset; + } + + /** + * @return the length of the data starting from {@link #dataOffset} + */ + public int getDataLength() { + return dataLength; + } + + /** + * Set the length of the data starting from {@link #dataOffset}. + * + * @param dataLength + * the new data length + */ + public void setDataLength(int dataLength) { + this.dataLength = dataLength; + } + + /** + * Calculate the offset of the last byte of data in this file block relative + * to the block. + * + * @return the last byte's offset relative to the block + */ + public long getLastDataOffset() { + return getDataOffset() + getDataLength(); + } + + /** + * @return the position of the file pointer relative to this block, assuming + * that pointer is currently inside this block to begin with + */ + public int getDataPosition() { + return dataPosition; + } + + /** + * Set the position of the file pointer relative to this block, assuming + * that pointer is currently inside this block to begin with + * + * @param dataPosition + * the new data position + */ + public void setDataPosition(int dataPosition) { + this.dataPosition = dataPosition; + } + + public long getPositionOffset() { + return getDataOffset() + getDataPosition(); + } + + /** + * Create a readable name of the block, derived from it's block number. + * + * @param blockNumber + * the number of the block to use in the name + * @return + */ + public static String createBlockName(int blockNumber) { + return BLOCK_COLUMN_NAME_PREFIX + blockNumber; + } + + /** + * @return a shallow clone of this file block + */ + @Override + public Object clone() { + try { + return super.clone(); + } catch (CloneNotSupportedException e) { + return null; + } + } + } + + // A comparator that checks if two byte arrays are the same or not. + protected static final Comparator BYTE_ARRAY_COMPARATOR = new Comparator() { + public int compare(byte[] o1, byte[] o2) { + if (o1 == null || o2 == null) { + return (o1 != null ? -1 : o2 != null ? 1 : 0); + } + if (o1.length != o2.length) { + return o1.length - o2.length; + } + for (int i = 0; i < o1.length; i++) { + if (o1[i] != o2[i]) { + return o1[i] - o2[i]; + } + } + return 0; + } + }; + + /** + * The BlockMap keeps track of file blocks by their names. Given + * that the name is a byte array, we rely on a custom comparator that knows + * how to compare such names. + */ + protected class BlockMap extends TreeMap { + private static final long serialVersionUID = 1550200273310875675L; + + /** + * Define a block map which is essentially a map of a block name (in the + * form of bytes) to the block data (again, in the form of bytes). Given + * that byte arrays don't lend themselves to comparison naturally, we pass + * it a custom comparator. + */ + public BlockMap() { + super(BYTE_ARRAY_COMPARATOR); + } + + /** + * Put a tuple in the block map, where the key is a + * {@link java.lang.String}. + * + * @param key + * a stringified key + * @param value + * a byte array value + * @return the previously associated value + */ + public byte[] put(String key, byte[] value) { + return super.put(key.getBytes(), value); + } + + /** + * Put a tuple in the block map, where the value is a + * {@link java.lang.String}. + * + * @param key + * a byte array key + * @param value + * a stringified value + * @return the previously associated value + */ + public byte[] put(byte[] key, String value) { + return super.put(key, value.getBytes()); + } + + /** + * Put a tuple in the block map, where both the key and value + * are a {@link java.lang.String}. + * + * @param key + * a stringified key + * @param value + * a stringified value + * @return the previously associated value + */ + public byte[] put(String key, String value) { + return super.put(key.getBytes(), value.getBytes()); + } + + /** + * Get the value for the given key, which is a {@link java.lang.String}. + * + * @param key + * a stringified key + * @return the currently associated value + */ + public byte[] get(String key) { + return super.get(key.getBytes()); + } + } + + /** + * A utility for serializing (and deserialize) the file descriptor to (and + * from) JSON objects. + */ + public static class FileDescriptorUtils { + /** + * Convert the given file descriptor to bytes. + * + * @param fileDescriptor + * @return + * @throws IOException + */ + public static byte[] toBytes(FileDescriptor fileDescriptor) + throws IOException { + return toString(fileDescriptor).getBytes(); + } + + /** + * Convert the given file descriptor to a String. + * + * @param fileDescriptor + * @return + * @throws IOException + */ + public static String toString(FileDescriptor fileDescriptor) + throws IOException { + return toJSON(fileDescriptor).toString(); + } + + /** + * Convert the given file descriptor to a {@link JSONObject} + * + * @param fileDescriptor + * @return + * @throws IOException + */ + public static JSONObject toJSON(FileDescriptor fileDescriptor) + throws IOException { + try { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("name", fileDescriptor.getName()); + jsonObject.put("length", fileDescriptor.getLength()); + jsonObject.put("deleted", fileDescriptor.isDeleted()); + jsonObject.put("lastModified", fileDescriptor.getLastModified()); + jsonObject.put("lastAccessed", fileDescriptor.getLastAccessed()); + JSONArray jsonArray = new JSONArray(); + for (FileBlock fileBlock : fileDescriptor.getBlocks()) { + JSONObject blockObject = new JSONObject(); + blockObject.put("columnName", fileBlock.getBlockName()); + blockObject.put("blockNumber", fileBlock.getBlockNumber()); + blockObject.put("blockSize", fileBlock.getBlockSize()); + blockObject.put("dataOffset", fileBlock.getDataOffset()); + blockObject.put("dataLength", fileBlock.getDataLength()); + jsonArray.put(blockObject); + } + jsonObject.put("blocks", jsonArray); + return jsonObject; + } catch (JSONException e) { + throw new IOException("Unable to serialize file descriptor for " + + fileDescriptor.getName(), e); + } + } + + /** + * Convert the given bytes to a file descriptor. + * + * @param descriptorBytes + * @return + * @throws IOException + */ + public static FileDescriptor fromBytes(byte[] descriptorBytes) + throws IOException { + try { + if (descriptorBytes == null) { + return null; + } + JSONTokener tokener = new JSONTokener(new InputStreamReader( + new ByteArrayInputStream(descriptorBytes))); + FileDescriptor fileDescriptor = FileDescriptorUtils + .fromJSON((JSONObject) tokener.nextValue()); + return (!fileDescriptor.isDeleted() ? fileDescriptor : null); + } catch (JSONException e) { + throw new IOException("Could not get descriptor for file.", e); + } + } + + /** + * Convert the given {@link JSONObject} to a file descriptor. + * + * @param jsonObject + * @return + * @throws IOException + */ + public static FileDescriptor fromJSON(JSONObject jsonObject) + throws IOException { + try { + FileDescriptor fileDescriptor = new FileDescriptor(jsonObject + .getString("name")); + fileDescriptor.setLength(jsonObject.getLong("length")); + fileDescriptor.setDeleted(jsonObject.getBoolean("deleted")); + fileDescriptor.setLastModified(jsonObject.getLong("lastModified")); + fileDescriptor.setLastAccessed(jsonObject.getLong("lastAccessed")); + fileDescriptor.setBlocks(new LinkedList()); + JSONArray blockArray = jsonObject.getJSONArray("blocks"); + if (blockArray != null) { + for (int index = 0; index < blockArray.length(); index++) { + JSONObject blockObject = (JSONObject) blockArray.get(index); + FileBlock fileBlock = new FileBlock(); + fileBlock.setBlockName(blockObject.getString("columnName")); + fileBlock.setBlockNumber(blockObject.getInt("blockNumber")); + fileBlock.setBlockSize(blockObject.getInt("blockSize")); + fileBlock.setDataOffset(blockObject.getInt("dataOffset")); + fileBlock.setDataLength(blockObject.getInt("dataLength")); + fileDescriptor.addLastBlock(fileBlock); + } + } + return fileDescriptor; + } catch (JSONException e) { + throw new IOException("Unable to de-serialize file descriptor from " + + jsonObject, e); + } + } + + /** + * Seek to the file block that the given file pointer positions itself on. + * + * @param descriptor + * the descriptor of the file + * @param filePointer + * the pointer within the file to seek to + * @return + */ + public static FileBlock seekBlock(FileDescriptor descriptor, + long filePointer) { + long currentPointer = 0; + for (FileBlock fileBlock : descriptor.getBlocks()) { + if (filePointer <= currentPointer) { + fileBlock.setDataPosition((int) (filePointer - currentPointer)); + fileBlock.setBlockOffset(currentPointer); + return fileBlock; + } + currentPointer += fileBlock.getDataLength(); + } + return null; + } + } + + /** + * The ColumnOrientedDirectory captures the mapping of the + * concepts of a directory to a column family in Cassandra. Specifically, it + * treats each row in the column family as a file underneath the directory. + * + *

+ * This class in turn relies on the {@link CassandraClient} for all low-level + * gets and puts to the Cassandra server. More importantly, it does not + * require that the {@link CassandraClient} to be familiar with the notion of + * Lucene directories. Rather, it transparently translates those notions to + * column families. In so doing, it ends up hiding the Cassandra layer from + * its consumers. + *

+ */ + public class ColumnOrientedDirectory { + /** + * @return the names of the files in this directory + * @throws IOException + */ + public String[] getFileNames() throws IOException { + byte[][] keys = cassandraClient.getKeys(systemColumns); + List fileNames = new ArrayList(); + for (byte[] key : keys) { + fileNames.add(new String(key)); + } + return fileNames.toArray(new String[] {}); + } + + /** + * Return the file descriptor for the file of the given name. If the file + * cannot be found, then return null, instead of trying to create it. + * + * @param fileName + * the name of the file + * @return the descriptor for the given file + * @throws IOException + */ + protected FileDescriptor getFileDescriptor(String fileName) + throws IOException { + return getFileDescriptor(fileName, false); + } + + /** + * Return the file descriptor for the file of the given name. + * + * @param fileName + * the name of the file + * @param createIfNotFound + * if the file wasn't found, create it + * @return the descriptor for the given file + * @throws IOException + */ + protected FileDescriptor getFileDescriptor(String fileName, + boolean createIfNotFound) throws IOException { + FileDescriptor fileDescriptor = FileDescriptorUtils + .fromBytes(cassandraClient.getColumn(fileName.getBytes(), + descriptorColumn.getBytes())); + + if (fileDescriptor == null && createIfNotFound) { + fileDescriptor = new FileDescriptor(fileName, getBlockSize()); + setFileDescriptor(fileDescriptor); + } + return fileDescriptor; + } + + /** + * Save the given file descriptor. + * + * @param fileDescriptor + * the file descriptor being saved + * @throws IOException + */ + public void setFileDescriptor(FileDescriptor fileDescriptor) + throws IOException { + BlockMap blockMap = new BlockMap(); + blockMap.put(descriptorColumn, FileDescriptorUtils + .toString(fileDescriptor)); + cassandraClient.setColumns(fileDescriptor.getName().getBytes(), blockMap); + } + } + + /** + * The ColumnOrientedFile captures the mapping of the concept of + * a file to a row in Cassandra. Specifically, it considers each column in the + * row as a block in the file. Furthermore, it uses one of the columns to hold + * the {@link FileDescriptor} for the file, in the form of a JSON string + * (which serves to make the "file" readable by other, potentially disparate, + * clients). + * + *

+ * This class in turn relies on the {@link CassandraClient} for all low-level + * gets and puts to the Cassandra server. More importantly, it does not + * require that the {@link CassandraClient} be familiar with the notion of + * Lucene files. Rather, it transparently translates those notions to rows + * within the column family denoting the directory. In so doing, it ends up + * hiding the Cassandra layer from its consumers. + *

+ */ + public class ColumnOrientedFile { + /** + * Write the given blocks in the file referenced by the given descriptor. + * + * @param fileDescriptor + * the descriptor of the file being written to + * @param blocksToBeWritten + * the map of block names to values + * @throws IOException + */ + public void writeFileBlocks(FileDescriptor fileDescriptor, + BlockMap blocksToBeWritten) throws IOException { + System.out.println("The file descriptor saved was " + + FileDescriptorUtils.toJSON(fileDescriptor)); + blocksToBeWritten.put(descriptorColumn, FileDescriptorUtils + .toString(fileDescriptor)); + cassandraClient.setColumns(fileDescriptor.getName().getBytes(), + blocksToBeWritten); + } + + /** + * Read the given blocks from the fiel referenced by the given descriptor. + * + * @param fileDescriptor + * the descriptor of the file being read + * @param blockNames + * the (unique) set of block names to read from + * @return the map of block names to values + * @throws IOException + */ + public BlockMap readFileBlocks(FileDescriptor fileDescriptor, + Set blockNames) throws IOException { + Map columns = cassandraClient.getColumns(fileDescriptor + .getName().getBytes(), blockNames); + BlockMap blockMap = new BlockMap(); + blockMap.putAll(columns); + return blockMap; + } + + } + + /** + * The CassandraIndexOutput acts as an output stream for a + * Cassandra-based file of a given name. In essense, it allows Lucene's + * low-level data types to be written into that file, using a write-behind + * caching mechanism. + * + *

+ * Specifically, it builds upon the write-behind cache implemented by the + * {@link BufferedIndexOutput}, by essentially mapping one or more blocks into + * the underlying buffer. Ergo, it is recommended that the buffer size be a + * exact multiple of the block size, as that will help to reduce the number of + * round trips to the Cassandra server. + *

+ */ + public class CassandraIndexOutput extends BufferedIndexOutput { + // The file descriptor corresponding to this file. + FileDescriptor fileDescriptor; + + // A map that keeps track of (partial block) fragments. While this is + // currently not being used, it could be used to de-fragment the file as + // subsequence writes occur. + BlockMap fragments; + + // The file block where the file pointer is currently positioned. + FileBlock currentBlock; + + // A flag indicating whether or not this stream is open. + private volatile boolean isOpen; + + protected ColumnOrientedFile columnOrientedFile; + + public CassandraIndexOutput(String fileName, int bufferSize) + throws IOException { + super(bufferSize); + fileDescriptor = columnOrientedDirectory + .getFileDescriptor(fileName, true); + currentBlock = fileDescriptor.getFirstBlock(); + isOpen = true; + columnOrientedFile = new ColumnOrientedFile(); + } + + /** + * Only close the file if it has not been closed yet. + */ + @Override + public void close() throws IOException { + if (isOpen) { + try { + super.close(); + } finally { + isOpen = false; + fragments.clear(); + } + } + } + + /** + * @return the current length of the file + */ + @Override + public long length() throws IOException { + return fileDescriptor.getLength(); + } + + /** + * Seek to the block corresponding to the given file position, and then move + * to the exact data position within that block. + */ + @Override + public void seek(long position) throws IOException { + currentBlock = FileDescriptorUtils.seekBlock(fileDescriptor, position); + } + + /** + * Map the bytes (that were in the underlying buffer) to their corresponding + * file blocks based on the current file pointer, and write the blocks + * through to the column family in the Cassandra server. + */ + @Override + protected void flushBuffer(byte[] bytes, int offset, int length) + throws IOException { + if (length == 0) { + return; + } + + BlockMap blocksToFlush = new BlockMap(); + + if (currentBlock.getDataPosition() > 0) { + FileBlock preFragment = (FileBlock) currentBlock.clone(); + preFragment.setDataLength(currentBlock.getDataPosition()); + fileDescriptor.insertBlock(currentBlock, preFragment, false); + } + + int bytesLeftToWrite = length; + int bytesAddedByWrite = 0; + do { + maybeRepositionCurrentBlock(); + int dataLength = (int) Math.min(currentBlock.getBlockSize() + - currentBlock.getPositionOffset(), bytesLeftToWrite); + int currentLength = currentBlock.getDataLength(); + FileBlock nextBlock; + if (currentBlock.getDataPosition() == 0 + && dataLength > currentBlock.getDataLength()) { + nextBlock = currentBlock; + nextBlock.setDataLength(dataLength); + } else { + nextBlock = fileDescriptor.createBlock(); + nextBlock.setDataLength(dataLength); + nextBlock.setDataOffset(currentBlock.getPositionOffset()); + } + byte[] partialBytes = new byte[dataLength]; + System.arraycopy(bytes, offset, partialBytes, 0, dataLength); + blocksToFlush.put(nextBlock.getBlockName(), partialBytes); + nextBlock.setDataPosition(dataLength); + if (nextBlock != currentBlock) { + FileBlock blockToBeRemoved; + if (nextBlock.getDataPosition() > 0) { + blockToBeRemoved = currentBlock; + fileDescriptor.insertBlock(currentBlock, nextBlock, true); + } else { + blockToBeRemoved = currentBlock; + fileDescriptor.insertBlock(currentBlock, nextBlock, false); + } + for (; blockToBeRemoved != null + && blockToBeRemoved.getLastDataOffset() < nextBlock + .getLastDataOffset(); blockToBeRemoved = fileDescriptor + .getNextBlock(blockToBeRemoved)) { + fileDescriptor.removeBlock(blockToBeRemoved); + } + } + bytesLeftToWrite -= dataLength; + offset += dataLength; + if (fileDescriptor.isLastBlock(nextBlock)) { + if (nextBlock != currentBlock) { + bytesAddedByWrite += dataLength; + } else { + bytesAddedByWrite += dataLength - currentLength; + } + } + currentBlock = nextBlock; + } while (bytesLeftToWrite > 0); + + if (currentBlock.getDataPosition() < currentBlock.getDataLength()) { + FileBlock postFragment = (FileBlock) currentBlock.clone(); + postFragment.setDataOffset(currentBlock.getPositionOffset()); + postFragment + .setDataLength((int) (currentBlock.getDataLength() - postFragment + .getDataOffset())); + + fileDescriptor.insertBlock(currentBlock, postFragment, true); + currentBlock = postFragment; + currentBlock.setBlockOffset(currentBlock.getBlockOffset() + + currentBlock.getDataPosition()); + currentBlock.setDataPosition(0); + } + + maybeRepositionCurrentBlock(); + + long now = new Date().getTime(); + if (bytesAddedByWrite > 0) { + fileDescriptor + .setLength(fileDescriptor.getLength() + bytesAddedByWrite); + } + fileDescriptor.setLastAccessed(now); + fileDescriptor.setLastModified(now); + columnOrientedFile.writeFileBlocks(fileDescriptor, blocksToFlush); + } + + /** + * In the event the file pointer is currently positioned at the exact end of + * the data range in the current block, then reposition to the first byte in + * the ensuing block. Furthermore, if there is no ensuing block, then create + * a brand-new block, append it to the end of the file, and move into that + * new block. + */ + protected void maybeRepositionCurrentBlock() { + if (currentBlock.getDataPosition() == currentBlock.getDataLength() + && currentBlock.getPositionOffset() == currentBlock.getBlockSize()) { + FileBlock nextBlock = fileDescriptor.getNextBlock(currentBlock); + if (nextBlock == null) { + nextBlock = fileDescriptor.createBlock(); + fileDescriptor.insertBlock(currentBlock, nextBlock, true); + } + currentBlock = nextBlock; + } + } + } + + /** + * The CassandraIndexInput acts as a input stream for a + * Cassandra-based file of a given name. In essense, it allows Lucene's + * low-level data types to be read from that file, using a read-ahead caching + * mechanism. + * + *

+ * Specifically, it builds upon the read-ahead cache implemented by the + * {@link BufferedIndexInput}, by essentially mapping one or more blocks into + * the underlying buffer. Ergo, it is recommended that the buffer size be a + * exact multiple of the block size, as that will help to reduce the number of + * round trips to the Cassandra server. + *

+ */ + public class CassandraIndexInput extends BufferedIndexInput { + // A reference to the descriptor for the file being read from. + protected final FileDescriptor fileDescriptor; + + // A reference to the current block being read from. + protected FileBlock currentBlock; + + // The length of the file as determined when it was opened. + protected long fileLength; + + protected ColumnOrientedFile columnOrientedFile; + + /** + * Construct a type of {@link IndexInput} that understands how to read from + * the Cassandra-based file of the given name. It uses a read-ahead buffer + * under the covers whose size is specified by the given buffer size. + * + * @param fileName + * the name of the file to read + * @param bufferSize + * the size of the input buffer + * @throws IOException + */ + public CassandraIndexInput(String fileName, int bufferSize) + throws IOException { + super(bufferSize); + fileDescriptor = columnOrientedDirectory.getFileDescriptor(fileName); + if (fileDescriptor == null) { + throw new IOException("Unable to locate file " + fileName); + } + currentBlock = fileDescriptor.getFirstBlock(); + fileLength = fileDescriptor.getLength(); + columnOrientedFile = new ColumnOrientedFile(); + } + + /** + * There's nothing to close. + */ + @Override + public void close() throws IOException {} + + /** + * @return the cached value of the file length + */ + @Override + public long length() { + return fileLength; + } + + /** + * Read the given number (i.e., length) of bytes, by first determining which + * blocks to retrieve, and then copying over the data from each of those + * blocks into the given byte array starting at the gien offset, one block + * at a time. + * + * @param bytes + * the array to read bytes into + * @param offset + * the offset in the array to start storing bytes + * @param length + * the number of bytes to read + */ + @Override + protected void readInternal(byte[] bytes, int offset, int length) + throws IOException { + Set blockNames = new TreeSet(BYTE_ARRAY_COMPARATOR); + List blocksToBeRead = new ArrayList(); + + int bytesToBeRead = length; + do { + byte[] columnName = currentBlock.getBlockName().getBytes(); + if (!blockNames.contains(columnName)) { + blockNames.add(columnName); + } + blocksToBeRead.add(currentBlock); + FileBlock nextBlock = fileDescriptor.getNextBlock(currentBlock); + if (nextBlock == null) { + break; + } + bytesToBeRead -= currentBlock.getDataLength(); + currentBlock = nextBlock; + } while (bytesToBeRead > 0); + + BlockMap blockMap = columnOrientedFile.readFileBlocks(fileDescriptor, + blockNames); + bytesToBeRead = length; + for (FileBlock blockToBeRead : blocksToBeRead) { + for (Map.Entry columnEntry : blockMap.entrySet()) { + String columnName = new String(columnEntry.getKey()); + byte[] columnValue = columnEntry.getValue(); + if (columnName.equals(blockToBeRead.getBlockName())) { + int bytesToReadFromBlock = (int) Math.min(bytesToBeRead, + (blockToBeRead.getDataLength() + blockToBeRead + .getDataPosition())); + System.arraycopy(columnValue, 0, bytes, offset, + bytesToReadFromBlock); + bytesToBeRead -= bytesToReadFromBlock; + offset += bytesToReadFromBlock; + blockToBeRead.setDataPosition(blockToBeRead.getDataPosition() + + bytesToReadFromBlock); + } + } + } + + if (currentBlock.getDataPosition() == currentBlock.getDataLength()) { + FileBlock nextBlock = fileDescriptor.getNextBlock(currentBlock); + if (nextBlock != null) { + currentBlock = nextBlock; + } + } + } + + /** + * Seek to the block corresponding to the given file position, and then move + * to the exact data position within that block. + */ + @Override + protected void seekInternal(long position) throws IOException { + currentBlock = FileDescriptorUtils.seekBlock(fileDescriptor, position); + } + } + + /** + * The CassandraClient encapsulates the low-level interactions of + * the directory with the (remote) Cassandra server. In particular, it + * delegates the request through to a Thrift client, which may or may not be + * framed. + * + *

+ * Note that this class is not aware of the notions of directories and files + * described above. Instead, it simply provides basic operations, such as + * those that get/set columns/keys. This separation of concern of the concepts + * of Cassandra and Lucene serves to not only keep the design simple, but also + * readable (hopefully). + *

+ */ + public class CassandraClient { + // The underlying thrift client to delegate requests to. + protected Cassandra.Client thriftClient; + + /** + * Construct a Cassandra client that knows how to get/set rows/columns from + * the given keyspace and column family, residing in the given Cassandra + * server. + * + * @param host + * the host where the Cassandra Thrift server is located + * @param port + * the port where the Cassandra Thrift server is listening + * @param framed + * a flag indicating whether or not to use a framed transport + * @throws IOException + */ + public CassandraClient(String host, int port, boolean framed) + throws IOException { + TSocket socket = new TSocket(host, port); + TTransport transport = framed ? new TFramedTransport(socket) : socket; + + try { + transport.open(); + thriftClient = new Cassandra.Client(new TBinaryProtocol(transport)); + Map credentials = new HashMap(); + Set keyspaces = thriftClient.describe_keyspaces(); + if (!keyspaces.contains(keyspace)) { + List cfDefs = new ArrayList(); + cfDefs.add(new CfDef(keyspace, columnFamily)); + thriftClient.system_add_keyspace(new KsDef(keyspace, + "org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefs)); + } + thriftClient.set_keyspace(keyspace); + try { + CfDef cfDef = new CfDef(keyspace, columnFamily); + thriftClient.system_add_column_family(cfDef); + } catch (InvalidRequestException e) {} + thriftClient.login(new AuthenticationRequest(credentials)); + } catch (Exception e) { + throw new IOException("Unable to open connection to keyspace " + + keyspace, e); + } + } + + /** + * Return the keys that define the given column names. + * + * @param columnNames + * the names of the columns + * @return the rows that contain those columns + * @throws IOException + */ + public byte[][] getKeys(List columnNames) throws IOException { + try { + List keySlices = thriftClient.get_range_slices( + new ColumnParent().setColumn_family(columnFamily), + new SlicePredicate().setColumn_names(columnNames), new KeyRange() + .setStart_key("".getBytes()).setEnd_key("".getBytes()), + ConsistencyLevel.ONE); + List keys = new ArrayList(); + for (KeySlice keySlice : keySlices) { + List coscs = keySlice.getColumns(); + if (coscs != null && coscs.size() == 1) { + ColumnOrSuperColumn cosc = coscs.get(0); + Column column = cosc.getColumn(); + FileDescriptor fileDescriptor = FileDescriptorUtils + .fromBytes(column.getValue()); + if (fileDescriptor == null || fileDescriptor.isDeleted()) { + continue; + } + keys.add(keySlice.key); + } + } + return keys.toArray(new byte[][] {}); + } catch (Exception e) { + throw new IOException("Unable to list all files in " + keyspace); + } + } + + /** + * Get the given set of columns for the row specified by the given key. + * + * @param key + * the key to the row to read from + * @param columnNames + * the names of the columns to fetch + * @return the values for those columns in that row + * @throws IOException + */ + public Map getColumns(byte[] key, Set columnNames) + throws IOException { + try { + List uniqueColumnNames = new ArrayList(); + uniqueColumnNames.addAll(columnNames); + List coscs = thriftClient.get_slice(key, + new ColumnParent(columnFamily), new SlicePredicate() + .setColumn_names(uniqueColumnNames), ConsistencyLevel.ONE); + Map columns = new HashMap(); + for (ColumnOrSuperColumn cosc : coscs) { + Column column = cosc.getColumn(); + columns.put(column.getName(), column.getValue()); + } + return columns; + } catch (Exception e) { + throw new IOException("Could not read from columns for file " + key, e); + } + } + + /** + * Get the given column for the row specified by the given key. + * + * @param key + * the key to the row to read from + * @param columnName + * the name of the column to fetch + * @return the value for that column in this row + * @throws IOException + */ + public byte[] getColumn(byte[] fileName, byte[] columnName) + throws IOException { + try { + List columnNames = new ArrayList(); + columnNames.add(columnName); + List coscs = thriftClient.get_slice(fileName, + new ColumnParent().setColumn_family(columnFamily), + new SlicePredicate().setColumn_names(columnNames), + ConsistencyLevel.ONE); + if (!coscs.isEmpty()) { + ColumnOrSuperColumn cosc = coscs.get(0); + Column column = cosc.getColumn(); + return column.getValue(); + } + return null; + } catch (Exception e) { + throw new IOException("Unable to read file descriptor for " + fileName, + e); + } + } + + /** + * Set the values for the given columns in the given row. + * + * @param key + * the key to the row being written to + * @param columnValues + * the values for the columns being updated + * @throws IOException + */ + protected void setColumns(byte[] key, Map columnValues) + throws IOException { + Map>> mutationMap = new HashMap>>(); + + Map> cfMutation = new HashMap>(); + mutationMap.put(key, cfMutation); + + List mutationList = new ArrayList(); + cfMutation.put(columnFamily, mutationList); + + if (columnValues == null || columnValues.size() == 0) { + Mutation mutation = new Mutation(); + Deletion deletion = new Deletion(System.currentTimeMillis()); + /** + * Currently, we cannot delete rows from a column family. This issue is + * being tracked at https://issues.apache.org/jira/browse/CASSANDRA-293. + * When that issue that resolved, we may at that time choose to revive + * the code shown below. + * + * deletion.setPredicate(new SlicePredicate().setSlice_range(new + * SliceRange(new byte[] {}, new byte[] {}, false, Integer.MAX_VALUE))); + */ + mutation.setDeletion(deletion); + mutationList.add(mutation); + + } else { + for (Map.Entry columnValue : columnValues.entrySet()) { + Mutation mutation = new Mutation(); + byte[] column = columnValue.getKey(), value = columnValue.getValue(); + if (value == null) { + + Deletion deletion = new Deletion(System.currentTimeMillis()); + + if (column != null) { + deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays + .asList(new byte[][] {column}))); + } else { + deletion.setPredicate(new SlicePredicate() + .setSlice_range(new SliceRange(new byte[] {}, new byte[] {}, + false, Integer.MAX_VALUE))); + } + + mutation.setDeletion(deletion); + + } else { + + ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); + cosc + .setColumn(new Column(column, value, System.currentTimeMillis())); + + mutation.setColumn_or_supercolumn(cosc); + } + + mutationList.add(mutation); + } + } + try { + thriftClient.batch_mutate(mutationMap, ConsistencyLevel.ONE); + } catch (Exception e) { + throw new IOException("Unable to mutate columns for file " + key, e); + } + } + } +} Index: contrib/directories/src/test/org/apache/lucene/store/TestCassandraDirectory.java =================================================================== --- contrib/directories/src/test/org/apache/lucene/store/TestCassandraDirectory.java (revision 0) +++ contrib/directories/src/test/org/apache/lucene/store/TestCassandraDirectory.java (revision 0) @@ -0,0 +1,324 @@ +package org.apache.lucene.store; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.util.LuceneTestCase; + +public class TestCassandraDirectory extends LuceneTestCase { + + CassandraDirectory cassandraDirectory; + + public void setUp() throws IOException { + cassandraDirectory = new CassandraDirectory("lucene1", "index", 10, 10); + for (String fileName : cassandraDirectory.listAll()) { + cassandraDirectory.deleteFile(fileName); + } + cassandraDirectory.createOutput("sampleFile"); + } + + public void tearDown() throws IOException { + if (cassandraDirectory != null) { + cassandraDirectory.close(); + } + } + + public void testListAll() throws IOException { + String[] files = cassandraDirectory.listAll(); + assertEquals(1, files.length); + assertEquals("sampleFile", files[0]); + } + + public void testFileExists() throws IOException { + assertTrue("The sample file should've been found, but it wasn't.", + cassandraDirectory.fileExists("sampleFile")); + try { + cassandraDirectory.fileExists("dummyFile"); + assertTrue("The dummy file should not've been found, but it was.", true); + } catch (IOException e) {} + } + + public void testFileModified() throws IOException { + long fileModified = cassandraDirectory.fileModified("sampleFile"); + long currentTime = new Date().getTime(); + long secondsSinceFileModified = (currentTime - fileModified) / 1000; + assertTrue( + "The sample file should have been modified just now, but it wasn't.", + secondsSinceFileModified >= 0 && secondsSinceFileModified < 1); + } + + public void testTouchFile() throws IOException, InterruptedException { + long fileModified = cassandraDirectory.fileModified("sampleFile"); + TimeUnit.SECONDS.sleep(3); + cassandraDirectory.touchFile("sampleFile"); + long fileTouched = cassandraDirectory.fileModified("sampleFile"); + long currentTime = new Date().getTime(); + long secondsFileUnmodified = (fileTouched - fileModified) / 1000; + assertTrue( + "The sample file was not quiet for 3 seconds before it was touched.", + secondsFileUnmodified >= 2 && secondsFileUnmodified <= 3); + long secondsSinceFileTouched = (currentTime - fileTouched) / 1000; + assertTrue( + "The sample file should'be been touched just now, but it wasn't.", + secondsSinceFileTouched >= 0 && secondsSinceFileTouched < 1); + + } + + public void testDeleteFile() throws IOException { + cassandraDirectory.deleteFile("sampleFile"); + assertTrue("The sample file should not've been found, but it was.", + !cassandraDirectory.fileExists("sampleFile")); + } + + public void testFileLength() throws IOException { + assertEquals("The sample file's length should be zero.", cassandraDirectory + .fileLength("sampleFile"), 0); + } + + public void testCreateFile() throws IOException { + for (int fileIndex = 0; fileIndex < 10; fileIndex++) { + String testFileName = "testFile" + fileIndex; + cassandraDirectory.createOutput(testFileName); + cassandraDirectory.fileExists(testFileName); + assertEquals("The test file's length should be zero.", cassandraDirectory + .fileLength(testFileName), 0); + } + } + + public void testWriteFile() throws IOException { + String smallerThanABlock = "0123"; + String exactlyABLock = "0123456789"; + String largerThanABLock = "0123456789A"; + + String[] dataSample = new String[] {smallerThanABlock, exactlyABLock, + largerThanABLock}; + + for (String dataPoint : dataSample) { + writeStrings(new String[] {dataPoint}); + } + + for (String dataPoint : dataSample) { + List stringsToBeWritten = new ArrayList(); + stringsToBeWritten.addAll(Arrays.asList(dataSample)); + stringsToBeWritten.remove(dataPoint); + writeStrings(stringsToBeWritten.toArray(new String[] {})); + } + + PermutationGenerator kPermutation = new PermutationGenerator( + dataSample.length); + while (kPermutation.hasMore()) { + int[] indices = kPermutation.getNext(); + String[] stringsToBeWritten = new String[3]; + for (int i = 0; i < indices.length; i++) { + stringsToBeWritten[i] = dataSample[indices[i]]; + } + writeStrings(dataSample); + } + } + + public void testReadFile() throws IOException { + String smallerThanABlock = "0123"; + String exactlyABLock = "0123456789"; + String largerThanABLock = "0123456789A"; + + String[] dataSample = new String[] {smallerThanABlock, exactlyABLock, + largerThanABLock}; + + for (String dataPoint : dataSample) { + readStrings(writeStrings(new String[] {dataPoint})); + } + + for (String dataPoint : dataSample) { + List stringsToBeWritten = new ArrayList(); + stringsToBeWritten.addAll(Arrays.asList(dataSample)); + stringsToBeWritten.remove(dataPoint); + readStrings(writeStrings(stringsToBeWritten.toArray(new String[] {}))); + } + + PermutationGenerator kPermutation = new PermutationGenerator( + dataSample.length); + while (kPermutation.hasMore()) { + int[] indices = kPermutation.getNext(); + String[] stringsToBeWritten = new String[3]; + for (int i = 0; i < indices.length; i++) { + stringsToBeWritten[i] = dataSample[indices[i]]; + } + readStrings(writeStrings(dataSample)); + } + } + + protected String[] writeStrings(String[] dataSample) throws IOException { + cassandraDirectory.deleteFile("sampleFile"); + IndexOutput indexOutput = cassandraDirectory.createOutput("sampleFile"); + int dataLength = 0; + for (String dataPoint : dataSample) { + indexOutput.writeString(dataPoint); + dataLength += dataPoint.length(); + } + indexOutput.flush(); + assertEquals("The index output's current file length is incorrect.", + dataLength + dataSample.length, indexOutput.length()); + return dataSample; + } + + protected void readStrings(String[] dataSample) throws IOException { + IndexInput indexInput = cassandraDirectory.openInput("sampleFile"); + int dataLength = 0; + for (String expectedDataPoint : dataSample) { + String actualDataPoint = indexInput.readString(); + assertEquals("The index input's next string did not match.", + expectedDataPoint, actualDataPoint); + dataLength += actualDataPoint.length(); + } + assertEquals("The index output's current file length is incorrect.", + dataLength + dataSample.length, indexInput.length()); + } + + public static class PermutationGenerator { + + private int[] a; + private BigInteger numLeft; + private BigInteger total; + + // ----------------------------------------------------------- + // Constructor. WARNING: Don't make n too large. + // Recall that the number of permutations is n! + // which can be very large, even when n is as small as 20 -- + // 20! = 2,432,902,008,176,640,000 and + // 21! is too big to fit into a Java long, which is + // why we use BigInteger instead. + // ---------------------------------------------------------- + + public PermutationGenerator(int n) { + if (n < 1) { + throw new IllegalArgumentException("Min 1"); + } + a = new int[n]; + total = getFactorial(n); + reset(); + } + + // ------ + // Reset + // ------ + + public void reset() { + for (int i = 0; i < a.length; i++) { + a[i] = i; + } + numLeft = new BigInteger(total.toString()); + } + + // ------------------------------------------------ + // Return number of permutations not yet generated + // ------------------------------------------------ + + public BigInteger getNumLeft() { + return numLeft; + } + + // ------------------------------------ + // Return total number of permutations + // ------------------------------------ + + public BigInteger getTotal() { + return total; + } + + // ----------------------------- + // Are there more permutations? + // ----------------------------- + + public boolean hasMore() { + return numLeft.compareTo(BigInteger.ZERO) == 1; + } + + // ------------------ + // Compute factorial + // ------------------ + + private static BigInteger getFactorial(int n) { + BigInteger fact = BigInteger.ONE; + for (int i = n; i > 1; i--) { + fact = fact.multiply(new BigInteger(Integer.toString(i))); + } + return fact; + } + + // -------------------------------------------------------- + // Generate next permutation (algorithm from Rosen p. 284) + // -------------------------------------------------------- + + public int[] getNext() { + + if (numLeft.equals(total)) { + numLeft = numLeft.subtract(BigInteger.ONE); + return a; + } + + int temp; + + // Find largest index j with a[j] < a[j+1] + + int j = a.length - 2; + while (a[j] > a[j + 1]) { + j--; + } + + // Find index k such that a[k] is smallest integer + // greater than a[j] to the right of a[j] + + int k = a.length - 1; + while (a[j] > a[k]) { + k--; + } + + // Interchange a[j] and a[k] + + temp = a[k]; + a[k] = a[j]; + a[j] = temp; + + // Put tail end of permutation after jth position in increasing order + + int r = a.length - 1; + int s = j + 1; + + while (r > s) { + temp = a[s]; + a[s] = a[r]; + a[r] = temp; + r--; + s++; + } + + numLeft = numLeft.subtract(BigInteger.ONE); + return a; + + } + + } + +}