Index: build.xml
===================================================================
--- build.xml (revision 957988)
+++ build.xml (working copy)
@@ -56,6 +56,12 @@
+
+
@@ -92,7 +98,7 @@
-
+
@@ -103,7 +109,7 @@
-
+
@@ -115,7 +121,7 @@
-
@@ -253,9 +259,7 @@
-
-
-
+
@@ -301,10 +305,10 @@
-
-
-
-
+
+
+
+
@@ -519,14 +523,14 @@
-
+
-
+
-
+
-
+
Index: cassandra-handler/ivy.xml
===================================================================
--- cassandra-handler/ivy.xml (revision 0)
+++ cassandra-handler/ivy.xml (revision 0)
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0)
@@ -0,0 +1,150 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook {
+
+ private TTransport transport;
+ private TProtocol proto;
+
+ @Override
+ public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Class extends InputFormat> getInputFormatClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class extends OutputFormat> getOutputFormatClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class extends SerDe> getSerDeClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Configuration getConf() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commitCreateTable(Table table) throws MetaException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+ // Cassandra 6.X Can not dynamically drop schema
+
+ }
+
+ private String getCassandraTableName(Table tbl) {
+ // Give preference to TBLPROPERTIES over SERDEPROPERTIES
+ // (really we should only use TBLPROPERTIES, so this is just
+ // for backwards compatibility with the original specs).
+ String tableName = tbl.getParameters().get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ if (tableName == null) {
+ tableName = tbl.getSd().getSerdeInfo().getParameters().get(
+ CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ }
+ if (tableName == null) {
+ tableName = tbl.getTableName();
+ }
+ return tableName;
+ }
+
+ @Override
+ public void preCreateTable(Table tbl) throws MetaException {
+ boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
+ if (tbl.getSd().getLocation() != null) {
+ throw new MetaException("LOCATION may not be specified for HBase.");
+ }
+ Set columnFamilies = new TreeSet();
+ String tblName = this.getCassandraTableName(tbl);
+ Map serdeParam =
+ tbl.getSd().getSerdeInfo().getParameters();
+ String cassandraColumnStr = serdeParam.get(CassandraSerDe.CASSANDRA_COL_MAPPING);
+ if (cassandraColumnStr == null) {
+ throw new MetaException("No cassandra.columns.mapping defined in Serde.");
+ }
+
+ Cassandra.Client client = new Cassandra.Client(proto);
+ try {
+ if ( ! client.describe_keyspaces().contains(
+ serdeParam.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME)) ){
+ throw new MetaException( serdeParam.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME) +" not found");
+ }
+ } catch (TException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private void connect() throws TTransportException{
+ transport = new TSocket(CassandraSerDe.CASSANDRA_HOST,
+ Integer.parseInt(CassandraSerDe.CASSANDRA_PORT));
+ proto = new TBinaryProtocol(transport);
+ transport.open();
+ }
+ @Override
+ public void preDropTable(Table table) throws MetaException {
+ // Nothing to do
+ }
+
+ @Override
+ public void rollbackCreateTable(Table table) throws MetaException {
+ // Nothing to do
+
+ }
+
+ @Override
+ public void rollbackDropTable(Table table) throws MetaException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0)
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+public class CassandraSerDe implements SerDe {
+
+ public static final String CASSANDRA_KEYSPACE_NAME="cassandra.ks.name";
+ public static final String CASSANDRA_CF_NAME="cassandra.cf.name";
+ public static final String CASSANDRA_RANGE_BATCH_SIZE="cassandra.range.size";
+ public static final String CASSANDRA_HOST="cassandra.host";
+ public static final String CASSANDRA_PORT="cassandra.port";
+
+ public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping";
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java (revision 0)
@@ -0,0 +1,21 @@
+
+
+public class HiveCassandraTableInputFormat
+
+ implements InputFormat, JobConfigurable {
+
+ static final Log LOG = LogFactory.getLog(HiveCassandraTableInputFormat.class);
+
+ private CassandraExposedTableInputFormat cassandraInputFormat;
+
+ public HiveCassandraTableInputFormat() {
+ CassandraInputFormat = new CassandraExposedTableInputFormat();
+ }
+
+ public RecordReader getRecordReader{ InputSplit split. JobConf jobConf, Reporter reporter )
+ throws IOException {
+ CassandraSplit cassandraSplit = new (CassandraSplit) split;
+ byte [] keyspaceNameBytes;
+ String cassandraTableName = job.get(CassandraSerDe.CASSANDRA_TABLE_NAME);
+ cassandraInputFormat.setCassandraKeyspace(
+
Index: cassandra-handler/build.xml
===================================================================
--- cassandra-handler/build.xml (revision 0)
+++ cassandra-handler/build.xml (revision 0)
@@ -0,0 +1,104 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+