Index: build.xml =================================================================== --- build.xml (revision 981263) +++ build.xml (working copy) @@ -53,11 +53,20 @@ + + + + + + + + + @@ -94,7 +103,7 @@ - + @@ -105,7 +114,7 @@ - + @@ -303,6 +312,7 @@ + Index: cassandra-handler/lib/clhm-production.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/clhm-production.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/high-scale-lib.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/high-scale-lib.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/libthrift-r917130.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/libthrift-r917130.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-cli-1.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-cli-1.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/avro-1.2.0-dev.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/avro-1.2.0-dev.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/apache-cassandra-0.6.3.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/apache-cassandra-0.6.3.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/antlr-3.1.3.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/antlr-3.1.3.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-codec-1.2.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-codec-1.2.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/google-collections-1.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/google-collections-1.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-collections-3.2.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-collections-3.2.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/jackson-core-asl-1.4.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/jackson-core-asl-1.4.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-lang-2.4.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-lang-2.4.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/json-simple-1.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/json-simple-1.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/jline-0.9.94.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/jline-0.9.94.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/slf4j-api-1.5.8.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/slf4j-api-1.5.8.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/log4j-1.2.14.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/log4j-1.2.14.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/ivy-2.1.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/ivy-2.1.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/ivy.xml =================================================================== --- cassandra-handler/ivy.xml (revision 0) +++ cassandra-handler/ivy.xml (revision 0) @@ -0,0 +1,8 @@ + + + + + + + + Index: cassandra-handler/src/test/results/cassandra_queries.q.out =================================================================== --- cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0) +++ cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0) @@ -0,0 +1,193 @@ +PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , +"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" +) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , +"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" +) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: describe cassandra_keyspace1_standard1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe cassandra_keyspace1_standard1 +POSTHOOK: type: DESCTABLE +key int from deserializer +value string from deserializer +PREHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB cassandra_keyspace1_standard1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 2) 0)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Filter Operator + predicate: + expr: ((key % 2) = 0) + type: boolean + Filter Operator + predicate: + expr: ((key % 2) = 0) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraTableInputFormat + output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat + serde: org.apache.hadoop.hive.cassandra.CassandraSerDe + name: cassandra_keyspace1_standard1 + + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@cassandra_keyspace1_standard1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: SELECT * FROM cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2010-08-05_08-07-16_191_666986318680807068/-mr-10000 +POSTHOOK: query: SELECT * FROM cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2010-08-05_08-07-16_191_666986318680807068/-mr-10000 +168 val_168 +348 val_348 +138 val_138 +134 val_134 +252 val_252 +76 val_76 +180 val_180 +164 val_164 +384 val_384 +498 val_498 +158 val_158 +126 val_126 +310 val_310 +396 val_396 +100 val_100 +278 val_278 +196 val_196 +482 val_482 +266 val_266 +430 val_430 +392 val_392 +256 val_256 +44 val_44 +208 val_208 +80 val_80 +472 val_472 +98 val_98 +224 val_224 +484 val_484 +170 val_170 +64 val_64 +438 val_438 +218 val_218 +286 val_286 +292 val_292 +466 val_466 +360 val_360 +400 val_400 +446 val_446 +242 val_242 +454 val_454 +494 val_494 +34 val_34 +156 val_156 +24 val_24 +172 val_172 +436 val_436 +432 val_432 +120 val_120 +470 val_470 +96 val_96 +274 val_274 +298 val_298 +394 val_394 +10 val_10 +296 val_296 +418 val_418 +458 val_458 +478 val_478 +190 val_190 +0 val_0 +368 val_368 +66 val_66 +72 val_72 +28 val_28 +30 val_30 +214 val_214 +78 val_78 +8 val_8 +104 val_104 +200 val_200 +262 val_262 +152 val_152 +2 val_2 +176 val_176 +386 val_386 +116 val_116 +490 val_490 +362 val_362 +424 val_424 +12 val_12 +332 val_332 +316 val_316 +174 val_174 +136 val_136 +318 val_318 +222 val_222 +364 val_364 +216 val_216 +284 val_284 +288 val_288 +160 val_160 +496 val_496 +344 val_344 +306 val_306 +26 val_26 +404 val_404 +382 val_382 +258 val_258 +462 val_462 +374 val_374 +74 val_74 +238 val_238 +444 val_444 +492 val_492 Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java =================================================================== --- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) +++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) @@ -0,0 +1,82 @@ +package org.apache.cassandra.contrib.utils.service; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; + +/** + * A cleanup utility that wipes the cassandra data directories. + * + * @author Ran Tavory (rantav@gmail.com) + * + */ +public class CassandraServiceDataCleaner { + + /** + * Creates all data dir if they don't exist and cleans them + * @throws IOException + */ + public void prepare() throws IOException { + makeDirsIfNotExist(); + cleanupDataDirectories(); + } + + /** + * Deletes all data from cassandra data directories, including the commit log. + * @throws IOException in case of permissions error etc. + */ + public void cleanupDataDirectories() throws IOException { + for (String s: getDataDirs()) { + cleanDir(s); + } + } + /** + * Creates the data diurectories, if they didn't exist. + * @throws IOException if directories cannot be created (permissions etc). + */ + public void makeDirsIfNotExist() throws IOException { + for (String s: getDataDirs()) { + mkdir(s); + } + } + + /** + * Collects all data dirs and returns a set of String paths on the file system. + * + * @return + */ + private Set getDataDirs() { + Set dirs = new HashSet(); + for (String s : DatabaseDescriptor.getAllDataFileLocations()) { + dirs.add(s); + } + dirs.add(DatabaseDescriptor.getLogFileLocation()); + return dirs; + } + /** + * Creates a directory + * + * @param dir + * @throws IOException + */ + private void mkdir(String dir) throws IOException { + FileUtils.createDirectory(dir); + } + + /** + * Removes all directory content from file the system + * + * @param dir + * @throws IOException + */ + private void cleanDir(String dir) throws IOException { + File dirFile = new File(dir); + if (dirFile.exists() && dirFile.isDirectory()) { + FileUtils.delete(dirFile.listFiles()); + } + } +} \ No newline at end of file Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) @@ -0,0 +1,59 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; + +import junit.extensions.TestSetup; +import junit.framework.Test; + +import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.transport.TTransportException; + +public class CassandraTestSetup extends TestSetup{ + + static final Log LOG = LogFactory.getLog(CassandraTestSetup.class); + + private EmbeddedCassandraService cassandra; + + public CassandraTestSetup(Test test){ + super(test); + } + + @SuppressWarnings("deprecation") + void preTest(HiveConf conf) throws IOException, TTransportException{ + if (cassandra ==null){ + //System.setProperty("storage-config", System.getProperty("user.dir")+"/src/test/resources" ); + LOG.debug("storage config"+System.getProperty("cassandra.resource.dir")); + System.setProperty("storage-config", System.getProperty("cassandra.resource.dir") ); + + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + cassandra = new EmbeddedCassandraService(); + cassandra.init(); + Thread t = new Thread(cassandra); + t.setDaemon(true); + t.start(); + } + + String auxJars = conf.getAuxJars(); + auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://" + + new JobConf(conf, Cassandra.Client.class).getJar(); + auxJars += ",file://" + new JobConf(conf, CassandraSerDe.class).getJar(); + auxJars += ",file:///home/edward/cassandra-handler/cassandra-handler/lib/libthrift-r917130.jar"; + conf.setAuxJars(auxJars); + + } + + @Override + protected void tearDown() throws Exception { + //do we need this? + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + } + +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0) @@ -0,0 +1,56 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.output.CassandraColumn; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; + +public class TestCassandraPut extends TestCase{ + + private JobConf jobConf; + private FileSystem fs; + private String tmpRoot; + private Path tmpRootPath; + + public void testCassandraPut() throws IOException{ + jobConf= new JobConf(TestCassandraPut.class); + fs = FileSystem.get(jobConf); + tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis(); + tmpRootPath = new Path(tmpRoot); + if (fs.exists(tmpRootPath)){ + fs.delete(tmpRootPath, true); + } + fs.mkdirs( new Path(tmpRoot) ); + Path sequenceFile = new Path(tmpRootPath,"afile"); + CassandraPut cp = new CassandraPut("mykey"); + CassandraColumn cc = new CassandraColumn(); + cc.setColumnFamily("Standard1"); + cc.setColumn("firstname".getBytes()); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setValue("edward".getBytes()); + cp.getColumns().add( cc ); + + SequenceFile.Writer writer = SequenceFile.createWriter + (fs, jobConf, sequenceFile, Text.class, CassandraPut.class , + SequenceFile.CompressionType.BLOCK ,new org.apache.hadoop.io.compress.DefaultCodec()); + + writer.append(new Text("edward"),cp ); + writer.close(); + + SequenceFile.Reader reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); + Text key = new Text(); + CassandraPut value= new CassandraPut(); + reader.next(key, value); + assertEquals(new Text("edward"),key); + + assertEquals( new String( cp.getColumns().get(0).getColumnFamily() ), + new String( "Standard1" ) ); + } +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0) @@ -0,0 +1,129 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; +import java.util.GregorianCalendar; +import java.util.Map; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.input.HiveIColumn; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class TestColumnFamilyInputFormat extends TestCase { + + private JobConf jobConf; + private FileSystem fs; + private String tmpRoot; + private Path tmpRootPath; + + private static TreeMap> mrResults = new TreeMap>(); + + @SuppressWarnings("deprecation") + public void testIf() throws Exception { + + EmbeddedCassandraService cassandra; + System.setProperty("storage-config", "/home/edward/encrypt/cassandra-cleanup/trunk/src/test/resources/"); + + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + cassandra = new EmbeddedCassandraService(); + cassandra.init(); + Thread t = new Thread(cassandra); + t.setDaemon(true); + t.start(); + + GregorianCalendar gc = new GregorianCalendar(); + long timestamp = gc.getTimeInMillis(); + Cassandra.Client client = getClient(); + ColumnPath cp = new ColumnPath("Standard1"); + cp.setColumn("first".getBytes("utf-8")); + + client.insert("Keyspace1", "ecapriolo", cp, "Edward".getBytes("UTF-8"), + timestamp, ConsistencyLevel.ONE); + + client.insert("Keyspace1", "2", cp, "2".getBytes("UTF-8"), + timestamp, ConsistencyLevel.ONE); + + ColumnOrSuperColumn theColumn = client.get("Keyspace1", "ecapriolo", cp, ConsistencyLevel.ONE); + assertEquals("Edward", new String(theColumn.column.value,"UTF-8")); + + jobConf= new JobConf(TestColumnFamilyInputFormat.class); + jobConf.set(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, "Keyspace1"); + jobConf.set(CassandraSerDe.CASSANDRA_CF_NAME, "Standard1"); + jobConf.set(CassandraSerDe.CASSANDRA_HOST, "localhost"); + jobConf.set(CassandraSerDe.CASSANDRA_PORT,"9170"); + jobConf.set(CassandraSerDe.CASSANDRA_COL_MAPPING, "Standard1:astring" ); + + fs = FileSystem.get(jobConf); + tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis(); + tmpRootPath = new Path(tmpRoot); + if (fs.exists(tmpRootPath)){ + fs.delete(tmpRootPath, true); + } + fs.mkdirs( new Path(tmpRoot) ); + Path aFolder = new Path(tmpRootPath,"afolder"); + fs.mkdirs( aFolder ); + fs.mkdirs( new Path(tmpRootPath,"bfolder")); + + jobConf.setMapperClass(MapClass.class); + jobConf.setNumReduceTasks(0); + jobConf.setInputFormat(org.apache.hadoop.hive.cassandra.input.HiveCassandraTableInputFormat.class); + jobConf.setOutputFormat(org.apache.hadoop.mapred.lib.NullOutputFormat.class ); + JobClient.runJob(jobConf); + + assert(mrResults.containsKey("ecapriolo")); + assert(mrResults.containsKey("2")); + TreeMap aRow = new TreeMap(); + aRow.put("first", "Edward"); + assertEquals( aRow, mrResults.get("ecapriolo")); + } + + static class MapClass extends MapReduceBase implements Mapper{ + + @SuppressWarnings("unchecked") + @Override + public void map(Text key, MapWritable value, OutputCollector out, Reporter rep) + throws IOException { + for (Map.Entry entry: value.entrySet()){ + String eKey= new String( ((BytesWritable) entry.getKey()).getBytes() ); + String eVal = new String( ((HiveIColumn) entry.getValue()).getValue() ); + TreeMap row = new TreeMap(); + row.put(eKey,eVal); + mrResults.put(key.toString(), row); + } + } + + } + + + private Cassandra.Client getClient() throws TTransportException { + TTransport tr = new TSocket("localhost", 9170); + TProtocol proto = new TBinaryProtocol(tr); + Cassandra.Client client = new Cassandra.Client(proto); + tr.open(); + return client; + } + +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) @@ -0,0 +1,23 @@ +package org.apache.hadoop.hive.cassandra; + +import org.apache.hadoop.hive.ql.QTestUtil; + +/** + * HBaseQTestUtil initializes HBase-specific test fixtures. + */ +public class CassandraQTestUtil extends QTestUtil { + public CassandraQTestUtil( + String outDir, String logDir, boolean miniMr, CassandraTestSetup setup) + throws Exception { + + super(outDir, logDir, miniMr, null); + setup.preTest(conf); + super.init(); + } + + @Override + public void init() throws Exception { + // defer + } +} + Index: cassandra-handler/src/test/resources/storage-conf.xml =================================================================== --- cassandra-handler/src/test/resources/storage-conf.xml (revision 0) +++ cassandra-handler/src/test/resources/storage-conf.xml (revision 0) @@ -0,0 +1,380 @@ + + + + + + + + Test Cluster + + + false + + + + + + + + + + + + + org.apache.cassandra.locator.RackUnawareStrategy + + + 1 + + + org.apache.cassandra.locator.EndPointSnitch + + + + + org.apache.cassandra.auth.AllowAllAuthenticator + + + org.apache.cassandra.dht.RandomPartitioner + + + + + + /tmp/commitlog + + /tmp/data + + + + + + 127.0.0.1 + + + + + + + 10000 + + 128 + + + + + + localhost + + 7001 + + + 0.0.0.0 + + 9170 + + false + + + + + + + + auto + + + 512 + + + 64 + + + 32 + 8 + + + 64 + + + 64 + + 256 + + 0.3 + + 60 + + + 8 + 32 + + + periodic + + 10000 + + + + + 864000 + Index: cassandra-handler/src/test/resources/log4j-tools.properties =================================================================== --- cassandra-handler/src/test/resources/log4j-tools.properties (revision 0) +++ cassandra-handler/src/test/resources/log4j-tools.properties (revision 0) @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set the root to INFO +# and the pattern to %c instead of %l. (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=WARN,stderr + +# stderr +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n Index: cassandra-handler/src/test/resources/log4j.properties =================================================================== --- cassandra-handler/src/test/resources/log4j.properties (revision 0) +++ cassandra-handler/src/test/resources/log4j.properties (revision 0) @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set the root to INFO +# and the pattern to %c instead of %l. (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=INFO,stdout,R + +# stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n + +# rolling log file +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.file.maxFileSize=20MB +log4j.appender.file.maxBackupIndex=50 +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +# Edit the next line to point to your logs directory +log4j.appender.R.File=/tmp/system.log + +# Application logging options +#log4j.logger.com.facebook=DEBUG +#log4j.logger.com.facebook.infrastructure.gms=DEBUG +#log4j.logger.com.facebook.infrastructure.db=DEBUG Index: cassandra-handler/src/test/resources/passwd.properties =================================================================== --- cassandra-handler/src/test/resources/passwd.properties (revision 0) +++ cassandra-handler/src/test/resources/passwd.properties (revision 0) @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a sample password file for SimpleAuthenticator. The format of +# this file is username=password. If -Dpasswd.mode=MD5 then the password +# is represented as an md5 digest, otherwise it is cleartext (keep this +# in mind when setting file mode and ownership). +jsmith=havebadpass +Elvis\ Presley=graceland4evar +dilbert=nomoovertime Index: cassandra-handler/src/test/resources/access.properties =================================================================== --- cassandra-handler/src/test/resources/access.properties (revision 0) +++ cassandra-handler/src/test/resources/access.properties (revision 0) @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a sample access file for SimpleAuthenticator. The format of +# this file is keyspace=users, where users is a comma delimited list of +# authenticatable users from passwd.properties. This file contains +# potentially sensitive information, keep this in mind when setting its +# mode and ownership. +Keyspace1=jsmith,Elvis Presley,dilbert Index: cassandra-handler/src/test/queries/cassandra_queries.q =================================================================== --- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) +++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) @@ -0,0 +1,17 @@ + + + +CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , +"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" +) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1"); + +describe cassandra_keyspace1_standard1; + +EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0; +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0; + +SELECT * FROM cassandra_keyspace1_standard1; Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm =================================================================== --- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) +++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) @@ -0,0 +1,125 @@ +package org.apache.hadoop.hive.cli; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.hive.cassandra.CassandraQTestUtil; +import org.apache.hadoop.hive.cassandra.CassandraTestSetup; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.antlr.runtime.*; +import org.antlr.runtime.tree.*; + +public class $className extends TestCase { + + private CassandraQTestUtil qt; + private CassandraTestSetup setup; + + public $className(String name, CassandraTestSetup setup) { + super(name); + qt = null; + this.setup = setup; + } + + @Override + protected void setUp() { + try { + boolean miniMR = false; + if ("$clusterMode".equals("miniMR")) { + miniMR = true; + } + + qt = new CassandraQTestUtil( + "$resultsDir.getCanonicalPath()", + "$logDir.getCanonicalPath()", miniMR, setup); + +#foreach ($qf in $qfiles) + qt.addFile("$qf.getCanonicalPath()"); +#end + } catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in setup"); + } + } + + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + public static Test suite() { + TestSuite suite = new TestSuite(); + CassandraTestSetup setup = new CassandraTestSetup(suite); +#foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + suite.addTest(new $className("testCliDriver_$tname", setup)); +#end + return setup; + } + + #foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + public void testCliDriver_$tname() throws Exception { + try { + System.out.println("Begin query: " + "$fname"); + qt.cliInit("$fname"); + int ecode = qt.executeClient("$fname"); + if (ecode != 0) { + fail("Client Execution failed with error code = " + ecode); + } + if (SessionState.get() != null) { + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + + if (jobInfoMap.size() != 0) { + String cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) { + fail("Wrong return code in hive history"); + } + } + } + + ecode = qt.checkCliDriverResults("$fname"); + if (ecode != 0) { + fail("Client execution results failed with error code = " + ecode); + } + } catch (Throwable e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception"); + } + + System.out.println("Done query: " + "$fname"); + assertTrue("Test passed", true); + } + +#end +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0) @@ -0,0 +1,165 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.security.MessageDigest; +import java.util.Collection; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.hadoop.io.Writable; + +public class HiveIColumn implements IColumn, Writable { + + private byte [] name; + private byte [] value; + private long timestamp; + + public HiveIColumn(){ + + } + + @Override + public byte[] name() { + return name; + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public byte[] value() { + return value; + } + + @Override + public void readFields(DataInput in) throws IOException { + name = new byte[in.readInt()]; + in.readFully(name); + + value= new byte[in.readInt()]; + in.readFully(value); + + timestamp = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(name.length); + out.write(name); + + out.writeInt(value.length); + out.write(value); + + out.writeLong(timestamp); + } + + //bean patterns + + public byte[] getName() { + return name; + } + + public void setName(byte[] name) { + this.name = name; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + //not needed for current integration + + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + @Override + public void addColumn(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn diff(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public int getLocalDeletionTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMarkedForDeleteAt() { + throw new UnsupportedOperationException(); + } + + @Override + public int getObjectCount() { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(AbstractType arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn getSubColumn(byte[] arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public Collection getSubColumns() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isMarkedForDelete() { + throw new UnsupportedOperationException(); + } + + @Override + public long mostRecentLiveChangeAt() { + throw new UnsupportedOperationException(); + } + + + + @Override + public int serializedSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDigest(MessageDigest arg0) { + throw new UnsupportedOperationException(); + } + + + @Override + public String toString(){ + StringBuffer sb = new StringBuffer(); + sb.append("HiveIColumn["); + sb.append("name "+new String(this.name)+" "); + sb.append("value "+new String(this.value)+" "); + sb.append("timestamp "+ this.timestamp+" "); + return sb.toString(); + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraRowResult.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraRowResult.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraRowResult.java (revision 0) @@ -0,0 +1,63 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + + + +public class CassandraRowResult implements Writable { + + private Text key; + private MapWritable value; + + public CassandraRowResult() { + } + + @Override + public void readFields(DataInput in) throws IOException { + key =new Text(); + key.readFields(in); + value = new MapWritable(); + value.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + key.write(out); + value.write(out); + } + + public Text getKey() { + return key; + } + + public void setKey(Text key) { + this.key = key; + } + + public MapWritable getValue() { + return value; + } + + public void setValue(MapWritable value) { + this.value = value; + } + + + @Override + public String toString(){ + StringBuffer sb = new StringBuffer(); + sb.append("RowResult key:"+key ); + for (Map.Entry entry : value.entrySet()){ + sb.append( "entry key:"+entry.getKey()+" " ); + sb.append( "entry value:"+entry.getValue()+" " ); + } + return sb.toString(); + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0) @@ -0,0 +1,129 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.cassandra.CassandraSerDe; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + +public class LazyCassandraRow extends LazyStruct{ + private List cassandraColumns; + private CassandraRowResult rowResult; + private List cassandraColumnBytes; + private ArrayList cachedList; + + public LazyCassandraRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + public void init(CassandraRowResult crr, List cassandraColumns, List cassandraColumnsBytes) { + this.rowResult = crr; + this.cassandraColumns = cassandraColumns; + this.cassandraColumnBytes = cassandraColumnsBytes; + setParsed(false); + } + + private void parse(){ + if (getFields() == null) { + List fieldRefs = + ((StructObjectInspector)getInspector()).getAllStructFieldRefs(); + setFields(new LazyObject[fieldRefs.size()]); + for (int i = 0; i < getFields().length; i++) { + String cassandraColumn = this.cassandraColumns.get(i); + if (cassandraColumn.endsWith(":")) { + // a column family + getFields()[i] = new LazyCassandraCellMap( (LazyMapObjectInspector) + fieldRefs.get(i).getFieldObjectInspector()); + continue; + } + + getFields()[i] = LazyFactory.createLazyObject( + fieldRefs.get(i).getFieldObjectInspector()); + } + setFieldInited(new boolean[getFields().length]); + } + Arrays.fill(getFieldInited(), false); + setParsed(true); + } + + @Override + public Object getField(int fieldID) { + if (!getParsed()) { + parse(); + } + return uncheckedGetField(fieldID); + } + + private Object uncheckedGetField(int fieldID) { + if (!getFieldInited()[fieldID]) { + getFieldInited()[fieldID] = true; + ByteArrayRef ref = null; + String columnName = cassandraColumns.get(fieldID); + String cfName=columnName.split(":")[0]; + String colName=columnName.split(":")[1]; + if (columnName.equals(CassandraSerDe.CASSANDRA_KEY_COLUMN)) { + ref = new ByteArrayRef(); + ref.setData( rowResult.getKey().getBytes() ); + } else { + if (columnName.endsWith(":")) { + return null; + //System.out.println("congratulations its a map"); + //((LazyCassandraCellMap) getFields()[fieldID]).init(rowResult, cfName); + //we can use this for super columns + } else { + Writable res = rowResult.getValue().get( new BytesWritable(colName.getBytes()) ) ; + HiveIColumn hic = (HiveIColumn) res; + if (res == null) { + return null; + } else + if ( hic != null ) { + ref = new ByteArrayRef(); + ref.setData( hic.value() ); + } else { + return null; + } + } + } + if (ref != null) { + getFields()[fieldID].init(ref, 0, ref.getData().length); + } + } + return getFields()[fieldID].getObject(); + } + + /** + * Get the values of the fields as an ArrayList. + * @return The values of the fields as an ArrayList. + */ + @Override + public ArrayList getFieldsAsList() { + if (!getParsed()) { + parse(); + } + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i = 0; i < getFields().length; i++) { + cachedList.add(uncheckedGetField(i)); + } + return cachedList; + } + + @Override + public Object getObject() { + return this; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraSplit.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraSplit.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraSplit.java (revision 0) @@ -0,0 +1,70 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +/* This class wraps the cassandra ColumnFamilySplit and adds the columnMapping + * That hive will use to 'tablize' the data inside it. + * */ +@SuppressWarnings("deprecation") +public class CassandraSplit extends FileSplit implements InputSplit{ + + private final ColumnFamilySplit split; + private String columnMapping; + + public CassandraSplit() { + super((Path) null, 0, 0, (String[]) null); + columnMapping = ""; + split = new ColumnFamilySplit(null,null,null); + } + + public CassandraSplit(ColumnFamilySplit split, String columnsMapping, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.split = split; + columnMapping = columnsMapping; + } + + /* bean patterns */ + public String getColumnMapping(){ + return columnMapping; + } + + public ColumnFamilySplit getSplit(){ + return this.split; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + columnMapping = in.readUTF(); + split.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(columnMapping); + split.write(out); + } + + @Override + public long getLength() { + return split.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } + + @Override + public String toString() { + return "ColumnFamilySplit " + split + " : " + columnMapping; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraTableInputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraTableInputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraTableInputFormat.java (revision 0) @@ -0,0 +1,175 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.CassandraSerDe; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +public class HiveCassandraTableInputFormat extends +ColumnFamilyInputFormat implements InputFormat { //map is byte[],HiveIColumn + static final Log LOG = LogFactory.getLog(HiveCassandraTableInputFormat.class); + + public HiveCassandraTableInputFormat() { + super(); + } + + @Override + public org.apache.hadoop.mapreduce.RecordReader> createRecordReader( + org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException{ + org.apache.hadoop.mapreduce.RecordReader> result = null; + try { + result = super.createRecordReader(inputSplit, taskAttemptContext); + result.initialize(((CassandraSplit)inputSplit).getSplit(), taskAttemptContext); + } catch (InterruptedException e) { + throw new IOException(e); + } + return result; + } + + public RecordReader getRecordReader + (InputSplit split,JobConf jobConf,final Reporter reporter) throws IOException { + CassandraSplit cassandraSplit = (CassandraSplit) split; + String cassandraTableName = jobConf.get(CassandraSerDe.CASSANDRA_CF_NAME); + String cassandraColumnsMapping = jobConf.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + List columns = CassandraSerDe.parseColumnMapping(cassandraColumnsMapping); + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + + if (columns.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + + org.apache.cassandra.hadoop.ColumnFamilySplit cfSplit = cassandraSplit.getSplit(); + + Job job= new Job(jobConf); + TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) { + @Override + public void progress() { + reporter.progress(); + } + }; + + final org.apache.hadoop.mapreduce.RecordReader> + recordReader = createRecordReader(cassandraSplit, tac); + + return new RecordReader(){ + + public void close() throws IOException{ + recordReader.close(); + } + public Text createKey(){ + return new Text(); + } + public CassandraRowResult createValue(){ + //return new TreeMap(); + return new CassandraRowResult(); + } + public long getPos() throws IOException { + return 0l; + } + public float getProgress() throws IOException { + float progress = 0.0F; + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return progress; + } + public boolean next(Text rowKey,CassandraRowResult value) throws IOException{ + boolean next = false; + try { + next = recordReader.nextKeyValue(); + if (next){ + rowKey.set( recordReader.getCurrentKey() ); + MapWritable theMap = new MapWritable(); + for (Map.Entry entry : recordReader.getCurrentValue().entrySet()){ + HiveIColumn hic = new HiveIColumn(); + hic.setName(entry.getValue().name() ); + hic.setValue(entry.getValue().value()); + hic.setTimestamp(entry.getValue().timestamp()); + theMap.put(new BytesWritable(entry.getKey()),hic); + } + value.setKey(rowKey); + value.setValue(theMap); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return next; + } + }; + + } + + public InputSplit[] getSplits( JobConf jobConf, int numSplits ) throws IOException { + + String ks = jobConf.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + String cf = jobConf.get(CassandraSerDe.CASSANDRA_CF_NAME); + int slicePredicateSize = jobConf.getInt(CassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, 1000); + int sliceRangeSize = jobConf.getInt(CassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, 100); + + SlicePredicate predicate = new SlicePredicate(); + predicate.setSlice_range( new SliceRange(new byte[0], new byte[0], false, slicePredicateSize) ); + ConfigHelper.setSlicePredicate(jobConf, predicate); + ConfigHelper.setColumnFamily(jobConf, ks, cf); + ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize); + + //column mapping + String cassandraColumnMapping = jobConf.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + if (cassandraColumnMapping == null){ + throw new IOException("cassandra.columns.mapping required for Cassandra Table."); + } + /* + List columns = CassandraSerDe.parseColumnMapping(cassandraColumnMapping); + List inputColumns = new ArrayList(); + for (String column : columns) { + if (CassandraSerDe.isSpecialColumn(column)) { + continue; + } + inputColumns.add(column.getBytes("UTF-8")); + } + */ + Job job = new Job(jobConf); + JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID()); + + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, new Path("/")); + FileInputFormat.addInputPath(job, new Path("/")); + //Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); + Path [] tablesPaths = new Path [] { new Path("/")}; + List splits = getSplits(jobContext); + InputSplit [] results = new InputSplit[splits.size()]; + + for (int i = 0; i < splits.size(); i++) { + results[i] = new CassandraSplit((ColumnFamilySplit) splits.get(i), + cassandraColumnMapping, tablesPaths[0]); + } + return results; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java (revision 0) @@ -0,0 +1,89 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; + +public class LazyCassandraCellMap extends LazyMap{ + + private CassandraRowResult rowResult; + private String cassandraColumnFamily; + + protected LazyCassandraCellMap(LazyMapObjectInspector oi) { + super(oi); + // TODO Auto-generated constructor stub + } + + public void init(CassandraRowResult rr, String columnFamily) { + rowResult = rr; + cassandraColumnFamily = columnFamily; + setParsed(false); + } + + private void parse() { + if (cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + + NavigableMap familyMap = new TreeMap(); + + + + } + + /** + * Get the value in the map for the given key. + * + * @param key + * @return + */ + + @Override + public Object getMapValueElement(Object key) { + if (!getParsed()) { + parse(); + } + + for (Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive) entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive + // writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI == null) { + continue; + } + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject) entry.getValue(); + return v == null ? v : v.getObject(); + } + } + + return null; + } + + @Override + public Map getMap() { + if (!getParsed()) { + parse(); + } + return cachedMap; + } + + @Override + public int getMapSize() { + if (!getParsed()) { + parse(); + } + return cachedMap.size(); + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) @@ -0,0 +1,124 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.CassandraSerDe; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + + + +public class HiveCassandraOutputFormat + implements HiveOutputFormat, + OutputFormat{ + + static final Log LOG = LogFactory.getLog(HiveCassandraOutputFormat.class); + + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + + final String cassandraKeySpace = jc.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + final String cassandraColumnFamily = jc.get(CassandraSerDe.CASSANDRA_CF_NAME); + final String cassandraHost = jc.get(CassandraSerDe.CASSANDRA_HOST); + final int cassandraPort = Integer.parseInt ( jc.get(CassandraSerDe.CASSANDRA_PORT) ); + final String consistancyLevel = jc.get(CassandraSerDe.CASSANDRA_CONSISTANCY_LEVEL, "ONE"); + ConsistencyLevel level = null; + if (consistancyLevel.equalsIgnoreCase("ONE")){ + level = ConsistencyLevel.ONE; + } else if ( consistancyLevel.equalsIgnoreCase("QUORUM") ){ + level = ConsistencyLevel.QUORUM; + } else { + level = ConsistencyLevel.ONE; + } + final ConsistencyLevel fLevel= level; + return new RecordWriter() { + + private Cassandra.Client client; + private TTransport transport; + private TProtocol proto; + @Override + public void close(boolean abort) throws IOException { + //no going back + } + + @Override + public void write(Writable w) throws IOException { + CassandraPut put = (CassandraPut) w; + ensureConnection(cassandraHost,cassandraPort,5000); + + for (CassandraColumn c : put.getColumns() ){ + + ColumnPath cp = new ColumnPath(); + cp.setColumn(c.getColumn() ); + cp.setColumn_family(c.getColumnFamily() ); + + try { + client.insert(cassandraKeySpace, put.getKey(), cp, c.getValue(), c.getTimeStamp(), fLevel); + } catch (InvalidRequestException e) { + throw new IOException(e); + } catch (UnavailableException e) { + throw new IOException(e); + } catch (TimedOutException e) { + throw new IOException(e); + } catch (TException e) { + throw new IOException(e); + } + } + + } + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + }; + } + + @Override + public void checkOutputSpecs(FileSystem arg0, JobConf jc) throws IOException { + + } + + @Override + public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem arg0, + JobConf arg1, String arg2, Progressable arg3) throws IOException { + throw new RuntimeException("Error: Hive should not invoke this method."); + + } + + + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) @@ -0,0 +1,87 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; + +public class CassandraPut implements Writable{ + + private String key; + //private ConsistencyLevel level; + private List columns; + + public CassandraPut(){ + columns = new ArrayList(); + } + + public CassandraPut(String key){ + this(); + setKey(key); + } + + @Override + public void readFields(DataInput in) throws IOException { + key = in.readUTF(); + int ilevel = in.readInt(); +// level = ConsistencyLevel.QUORUM; + int cols = in.readInt(); + for (int i =0;i getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("key: "); + sb.append(this.key); + for (CassandraColumn col:this.columns){ + sb.append( "column : [" ); + sb.append( col.toString() ); + sb.append( "]" ); + } + return sb.toString(); + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) @@ -0,0 +1,79 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class CassandraColumn implements Writable{ + + private String columnFamily; + private long timeStamp; + private byte [] column; + private byte [] value; + + + @Override + public void readFields(DataInput din) throws IOException { + columnFamily = din.readUTF(); + timeStamp = din.readLong(); + int clength= din.readInt(); + column = new byte[clength]; + din.readFully(column, 0, clength); + int vlength = din.readInt(); + value = new byte [vlength ]; + din.readFully( value, 0 , vlength); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(columnFamily); + out.writeLong(timeStamp); + out.writeInt( column.length ); + out.write(column); + out.writeInt( value.length ); + out.write(value); + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public byte[] getColumn() { + return column; + } + + public void setColumn(byte[] column) { + this.column = column; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append( "cf:"+ this.columnFamily); + sb.append( "column:"+ new String (this.column)); + sb.append( "value:"+ new String (this.value)); + return sb.toString(); + } +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0) @@ -0,0 +1,469 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.CassandraRowResult; +import org.apache.hadoop.hive.cassandra.input.LazyCassandraRow; +import org.apache.hadoop.hive.cassandra.output.CassandraColumn; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; + +public class CassandraSerDe implements SerDe { + + public static final String CASSANDRA_KEYSPACE_NAME="cassandra.ks.name"; + public static final String CASSANDRA_CF_NAME="cassandra.cf.name"; + public static final String CASSANDRA_RANGE_BATCH_SIZE="cassandra.range.size"; + public static final String CASSANDRA_SLICE_PREDICATE_SIZE="cassandra.slice.predicate.size"; + public static final String CASSANDRA_HOST="cassandra.host"; + public static final String CASSANDRA_PORT="cassandra.port"; + public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; + public static final String CASSANDRA_KEY_COLUMN=":key"; + public static final String CASSANDRA_CONSISTANCY_LEVEL="ONE"; + + private List cassandraColumnNames; + private ObjectInspector cachedObjectInspector; + private SerDeParameters serdeParams; + private LazyCassandraRow cachedCassandraRow; + private List cassandraColumnNamesBytes; + private final ByteStream.Output serializeStream = new ByteStream.Output(); + private boolean useJSONSerialize; + private int iKey; + + private byte [] separators; // the separators array + private boolean escaped; // whether we need to escape the data when writing out + private byte escapeChar; // which char to use as the escape char, e.g. '\\' + private boolean [] needsEscape; // which chars need to be escaped. This array should have size + // of 128. Negative byte values (or byte values >= 128) are + // never escaped. + + public static final Log LOG = LogFactory.getLog( CassandraSerDe.class.getName()); + + @Override + public String toString() { + return getClass().toString() + + "[" + + cassandraColumnNames + + ":" + + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames() + + ":" + + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldTypeInfos() + "]"; + } + + public CassandraSerDe () throws SerDeException { + + } + + + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + + initCassandraSerDeParameters(conf, tbl, getClass().getName()); + + cachedObjectInspector = LazyFactory.createLazyStructInspector( + serdeParams.getColumnNames(), + serdeParams.getColumnTypes(), + serdeParams.getSeparators(), + serdeParams.getNullSequence(), + serdeParams.isLastColumnTakesRest(), + serdeParams.isEscaped(), + serdeParams.getEscapeChar()); + + cachedCassandraRow = new LazyCassandraRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + //cassandraColumnNames is an array how will it print out? + if (LOG.isDebugEnabled()) { + LOG.debug("CassandraSerDe initialized with : columnNames = " + + serdeParams.getColumnNames() + + " columnTypes = " + + serdeParams.getColumnTypes() + + " cassandraColumnMapping = " + + cassandraColumnNames); + } + } + + private void initCassandraSerDeParameters( + Configuration job, Properties tbl, String serdeName) + throws SerDeException { + String cassandraColumnNameProperty = + tbl.getProperty(CassandraSerDe.CASSANDRA_COL_MAPPING); + String columnTypeProperty = + tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + cassandraColumnNames = parseColumnMapping(cassandraColumnNameProperty); + iKey = cassandraColumnNames.indexOf(CassandraSerDe.CASSANDRA_KEY_COLUMN); + + cassandraColumnNamesBytes = initColumnNamesBytes(cassandraColumnNames); + + // Build the type property string if not supplied + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < cassandraColumnNames.size(); i++) { + if (sb.length() > 0) { + sb.append(":"); + } + String colName = cassandraColumnNames.get(i); + if (isSpecialColumn(colName)) { + // a special column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } else if (colName.endsWith(":")) { + // a column family become a MAP + sb.append( + Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + + "," + Constants.STRING_TYPE_NAME + ">"); + } else { + // an individual column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + + serdeParams = LazySimpleSerDe.initSerdeParams( + job, tbl, serdeName); + + if (cassandraColumnNames.size() != serdeParams.getColumnNames().size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.getColumnNames().size() + + " elements while hbase.columns.mapping has " + + cassandraColumnNames.size() + " elements" + + " (counting the key if implicit)"); + } + + separators = serdeParams.getSeparators(); + escaped = serdeParams.isEscaped(); + escapeChar = serdeParams.getEscapeChar(); + needsEscape = serdeParams.getNeedsEscape(); + + // we just can make sure that "columnfamily:" is mapped to MAP + for (int i = 0; i < cassandraColumnNames.size(); i++) { + String hbaseColName = cassandraColumnNames.get(i); + if (hbaseColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.getColumnTypes().get(i); + if ((typeInfo.getCategory() != Category.MAP) || + (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName() + != Constants.STRING_TYPE_NAME)) { + + throw new SerDeException( + serdeName + ": Cassandra column family '" + + hbaseColName + + "' should be mapped to map but is mapped to " + + typeInfo.getTypeName()); + } + } + } + + } + + + @Override + public Object deserialize(Writable w) throws SerDeException { + if (!(w instanceof CassandraRowResult)){ + throw new SerDeException(getClass().getName() + ": expects Cassandra Row Result"); + } + CassandraRowResult crr = (CassandraRowResult) w; + this.cachedCassandraRow.init(crr, this.cassandraColumnNames,cassandraColumnNamesBytes); + return cachedCassandraRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return this.cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return CassandraPut.class; + } + + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames().size() > 0) ? + ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs() + : null; + + CassandraPut put = null; + + try { + String key = serializeField(iKey, null, fields, list, declaredFields); + + if (key == null) { + throw new SerDeException("Cassandra row key cannot be NULL"); + } + + put = new CassandraPut(key); + + // Serialize each field except key (done already) + for (int i = 0; i < fields.size(); i++) { + if (i == iKey) { + continue; + } + serializeField(i, put, fields, list, declaredFields); + } + } catch (IOException e) { + throw new SerDeException(e); + } + + return put; + + } + + + private String serializeField( + int i, + CassandraPut put, + List fields, + List list, + List declaredFields) throws IOException { + + // column name + String cassandraColumn = cassandraColumnNames.get(i); + + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + + if (f == null) { + return null; + } + + // If the field corresponds to a column family in cassandra + if (cassandraColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector)foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(f); + if (map == null) { + return null; + } else { + for (Map.Entry entry: map.entrySet()) { + // Get the Key + serializeStream.reset(); + serialize(entry.getKey(), koi, 3); + + // Get the column-qualifier + byte [] columnQualifier = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, serializeStream.getCount()); + + // Get the Value + serializeStream.reset(); + + boolean isNotNull = serialize(entry.getValue(), voi, 3); + if (!isNotNull) { + continue; + } + byte [] value = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount()); + + CassandraColumn cc = new CassandraColumn(); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setColumnFamily(cassandraColumn); + cc.setColumn(columnQualifier); + cc.setValue(value); + put.getColumns().add(cc); + + } + } + } else { + + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + + serializeStream.reset(); + boolean isNotNull; + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + isNotNull = serialize( + SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + 1); + } else { + isNotNull = serialize(f, foi, 1); + } + if (!isNotNull) { + return null; + } + byte [] key = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i == iKey) { + return new String(key); + } + CassandraColumn cc = new CassandraColumn(); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setColumnFamily(this.cassandraColumnNames.get(i).split(":")[0]); + cc.setColumn(this.cassandraColumnNames.get(i).split(":")[1].getBytes()); + cc.setValue(key); + put.getColumns().add(cc); + } + + return null; + + } + + + private boolean serialize(Object obj, ObjectInspector objInspector, int level) + throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8( + serializeStream, obj, + (PrimitiveObjectInspector) objInspector, + escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector)objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), eoi, level + 1); + } + } + return true; + } + case MAP: { + char separator = (char) separators[level]; + char keyValueSeparator = (char) separators[level+1]; + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry: map.entrySet()) { + if (first) { + first = false; + } else { + serializeStream.write(separator); + } + serialize(entry.getKey(), koi, level+2); + serializeStream.write(keyValueSeparator); + serialize(entry.getValue(), voi, level+2); + } + } + return true; + } + case STRUCT: { + char separator = (char)separators[level]; + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1); + } + } + return true; + } + } + throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); + } + + public static List parseColumnMapping(String columnMapping) { + String [] columnArray = columnMapping.split(","); + List columnList = Arrays.asList(columnArray); + int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); + if (iKey == -1) { + columnList = new ArrayList(columnList); + columnList.add(0, CASSANDRA_KEY_COLUMN); + } + return columnList; + } + + public static List initColumnNamesBytes(List columnNames) { + List columnBytes = new ArrayList(); + String column = null; + + for (int i = 0; i < columnNames.size(); i++) { + column = columnNames.get(i); + + try { + if (column.endsWith(":")) { + columnBytes.add((column.split(":")[0]).getBytes("UTF-8")); + } else { + columnBytes.add((column).getBytes("UTF-8")); + } + } catch (UnsupportedEncodingException ex){ + throw new RuntimeException(ex); + } + } + + return columnBytes; + } + + public static boolean isSpecialColumn(String columnName) { + return columnName.equals(CASSANDRA_KEY_COLUMN); + } + + } Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) @@ -0,0 +1,167 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.Map; +import java.util.Properties; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.HiveCassandraTableInputFormat; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook{ + + private TTransport transport; + private TProtocol proto; + private Cassandra.Client client; + + private Configuration configuration; + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) { + + Properties tableProperties = tableDesc.getProperties(); + String keyspaceName = + tableProperties.getProperty(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + if (keyspaceName == null) { + keyspaceName = tableProperties.getProperty(Constants.META_TABLE_NAME); + } + jobProperties.put(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, keyspaceName); + jobProperties.put( CassandraSerDe.CASSANDRA_COL_MAPPING, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_COL_MAPPING)); + jobProperties.put( CassandraSerDe.CASSANDRA_CF_NAME, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_CF_NAME)); + jobProperties.put( CassandraSerDe.CASSANDRA_HOST, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_HOST)); + jobProperties.put( CassandraSerDe.CASSANDRA_PORT, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_PORT)); + } + + + @Override + public Class getInputFormatClass() { + return HiveCassandraTableInputFormat.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public Class getOutputFormatClass() { + return org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return CassandraSerDe.class; + } + + + @Override + public Configuration getConf() { + return this.configuration; + } + + @Override + public void setConf(Configuration arg0) { + this.configuration=arg0; + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + //No work needed + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + //No work needed + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (!isExternal) { + throw new MetaException("Cassandra tables must be external."); + } + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for Cassandra."); + } + + String keyspaceName = this.getCassandraKeyspaceName(tbl); + Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); + String cassandraColumnStr = serdeParam.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + if (cassandraColumnStr == null) { + throw new MetaException("No cassandra.columns.mapping defined in Serde."); + } + String cassandraHost = serdeParam.get(CassandraSerDe.CASSANDRA_HOST); + int cassandraPort = Integer.parseInt(serdeParam.get( CassandraSerDe.CASSANDRA_PORT) ); + this.ensureConnection(cassandraHost, cassandraPort, 5000); + + try { + if ( ! client.describe_keyspaces().contains(keyspaceName) ){ + throw new MetaException( keyspaceName +" not found"); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void preDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // nothing to do + + } + + private String getCassandraKeyspaceName(Table tbl) { + String tableName = tbl.getParameters().get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + if (tableName == null) { + tableName = tbl.getSd().getSerdeInfo().getParameters().get( + CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + } + if (tableName == null) { + tableName = tbl.getTableName(); + } + return tableName; + } + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/SetCassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/SetCassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/SetCassandraColumn.java (revision 0) @@ -0,0 +1,75 @@ +package org.apache.hadoop.hive.cassandra.udf; + +import java.io.UnsupportedEncodingException; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class SetCassandraColumn extends org.apache.hadoop.hive.ql.exec.UDF { + + private Cassandra.Client client; + private TTransport transport; + private TProtocol proto; + + public int evaluate(String host, int port, int timeout, String keyspace, + String columnFamily, String column, String key, String value){ + + ensureConnection(host,port,timeout); + ColumnPath cp = new ColumnPath(); + + try { + cp.setColumn(column.getBytes("UTF-8") ); + } catch (UnsupportedEncodingException e1) { + throw new RuntimeException (e1); + } + cp.setColumn_family(columnFamily); + + try { + this.client.insert(keyspace, key, cp, value.getBytes("UTF-8"), System.currentTimeMillis(), ConsistencyLevel.QUORUM); + } catch (InvalidRequestException e) { + throw new RuntimeException (e); + } catch (UnavailableException e) { + throw new RuntimeException (e); + } catch (TimedOutException e) { + throw new RuntimeException (e); + } catch (TException e) { + throw new RuntimeException (e); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException (e); + } + return 0; + } + + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + + @Override + public void finalize(){ + if (transport != null){ + if (transport.isOpen()){ + transport.close(); + } + } + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/GetCassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/GetCassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/GetCassandraColumn.java (revision 0) @@ -0,0 +1,76 @@ +package org.apache.hadoop.hive.cassandra.udf; + +import java.io.UnsupportedEncodingException; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class GetCassandraColumn extends UDF { + private Cassandra.Client client; + private TTransport transport; + private TProtocol proto; + + public String evaluate(String host, int port, int timeout, String keyspace, + String columnFamily, String column, String key){ + + ensureConnection(host,port,timeout); + ColumnPath cp = new ColumnPath(); + + try { + cp.setColumn(column.getBytes("UTF-8") ); + } catch (UnsupportedEncodingException e1) { + throw new RuntimeException (e1); + } + cp.setColumn_family(columnFamily); + ColumnOrSuperColumn result = null; + try { + result = this.client.get(keyspace, key ,cp, ConsistencyLevel.QUORUM); + } catch (InvalidRequestException e) { + throw new RuntimeException (e); + } catch (NotFoundException e) { + throw new RuntimeException (e); + } catch (UnavailableException e) { + throw new RuntimeException (e); + } catch (TimedOutException e) { + throw new RuntimeException (e); + } catch (TException e) { + throw new RuntimeException (e); + } + return new String(result.column.value); + } + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + + @Override + public void finalize(){ + if (transport != null){ + if (transport.isOpen()){ + transport.close(); + } + } + } +} Index: cassandra-handler/build.xml =================================================================== --- cassandra-handler/build.xml (revision 0) +++ cassandra-handler/build.xml (revision 0) @@ -0,0 +1,160 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Tests failed! + + + + + + Index: build-common.xml =================================================================== --- build-common.xml (revision 981263) +++ build-common.xml (working copy) @@ -429,6 +429,7 @@ +