Index: build.xml =================================================================== --- build.xml (revision 957988) +++ build.xml (working copy) @@ -56,6 +56,12 @@ + + @@ -92,7 +98,7 @@ - + @@ -103,7 +109,7 @@ - + @@ -115,7 +121,7 @@ - @@ -253,9 +259,7 @@ - - - + @@ -301,10 +305,10 @@ - - - - + + + + @@ -519,14 +523,14 @@ - + - + - + - + Index: cassandra-handler/ivy.xml =================================================================== --- cassandra-handler/ivy.xml (revision 0) +++ cassandra-handler/ivy.xml (revision 0) @@ -0,0 +1,8 @@ + + + + + + + + Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) @@ -0,0 +1,150 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook { + + private TTransport transport; + private TProtocol proto; + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) { + // TODO Auto-generated method stub + + } + + @Override + public Class getInputFormatClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HiveMetaHook getMetaHook() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getOutputFormatClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getSerDeClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Configuration getConf() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setConf(Configuration arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + // TODO Auto-generated method stub + + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + // Cassandra 6.X Can not dynamically drop schema + + } + + private String getCassandraTableName(Table tbl) { + // Give preference to TBLPROPERTIES over SERDEPROPERTIES + // (really we should only use TBLPROPERTIES, so this is just + // for backwards compatibility with the original specs). + String tableName = tbl.getParameters().get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + if (tableName == null) { + tableName = tbl.getSd().getSerdeInfo().getParameters().get( + CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + } + if (tableName == null) { + tableName = tbl.getTableName(); + } + return tableName; + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for HBase."); + } + Set columnFamilies = new TreeSet(); + String tblName = this.getCassandraTableName(tbl); + Map serdeParam = + tbl.getSd().getSerdeInfo().getParameters(); + String cassandraColumnStr = serdeParam.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + if (cassandraColumnStr == null) { + throw new MetaException("No cassandra.columns.mapping defined in Serde."); + } + + Cassandra.Client client = new Cassandra.Client(proto); + try { + if ( ! client.describe_keyspaces().contains( + serdeParam.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME)) ){ + throw new MetaException( serdeParam.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME) +" not found"); + } + } catch (TException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private void connect() throws TTransportException{ + transport = new TSocket(CassandraSerDe.CASSANDRA_HOST, + Integer.parseInt(CassandraSerDe.CASSANDRA_PORT)); + proto = new TBinaryProtocol(transport); + transport.open(); + } + @Override + public void preDropTable(Table table) throws MetaException { + // Nothing to do + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // Nothing to do + + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // TODO Auto-generated method stub + + } + + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0) @@ -0,0 +1,50 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.Writable; + +public class CassandraSerDe implements SerDe { + + public static final String CASSANDRA_KEYSPACE_NAME="cassandra.ks.name"; + public static final String CASSANDRA_CF_NAME="cassandra.cf.name"; + public static final String CASSANDRA_RANGE_BATCH_SIZE="cassandra.range.size"; + public static final String CASSANDRA_HOST="cassandra.host"; + public static final String CASSANDRA_PORT="cassandra.port"; + + public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; + @Override + public Object deserialize(Writable blob) throws SerDeException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + // TODO Auto-generated method stub + + } + + @Override + public Class getSerializedClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + // TODO Auto-generated method stub + return null; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java (revision 0) @@ -0,0 +1,21 @@ + + +public class HiveCassandraTableInputFormat + + implements InputFormat, JobConfigurable { + + static final Log LOG = LogFactory.getLog(HiveCassandraTableInputFormat.class); + + private CassandraExposedTableInputFormat cassandraInputFormat; + + public HiveCassandraTableInputFormat() { + CassandraInputFormat = new CassandraExposedTableInputFormat(); + } + + public RecordReader getRecordReader{ InputSplit split. JobConf jobConf, Reporter reporter ) + throws IOException { + CassandraSplit cassandraSplit = new (CassandraSplit) split; + byte [] keyspaceNameBytes; + String cassandraTableName = job.get(CassandraSerDe.CASSANDRA_TABLE_NAME); + cassandraInputFormat.setCassandraKeyspace( + Index: cassandra-handler/build.xml =================================================================== --- cassandra-handler/build.xml (revision 0) +++ cassandra-handler/build.xml (revision 0) @@ -0,0 +1,104 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +