Index: build.xml
===================================================================
--- build.xml (revision 981263)
+++ build.xml (working copy)
@@ -53,11 +53,20 @@
+
+
+
+
+
+
+
+
+
@@ -94,7 +103,7 @@
-
+
@@ -105,7 +114,7 @@
-
+
@@ -303,6 +312,7 @@
+
Index: cassandra-handler/lib/clhm-production.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/clhm-production.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/high-scale-lib.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/high-scale-lib.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/libthrift-r917130.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/libthrift-r917130.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-cli-1.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-cli-1.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/avro-1.2.0-dev.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/avro-1.2.0-dev.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/apache-cassandra-0.6.3.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/apache-cassandra-0.6.3.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/antlr-3.1.3.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/antlr-3.1.3.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-codec-1.2.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-codec-1.2.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/google-collections-1.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/google-collections-1.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-collections-3.2.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-collections-3.2.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/jackson-core-asl-1.4.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/jackson-core-asl-1.4.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-lang-2.4.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-lang-2.4.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/json-simple-1.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/json-simple-1.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/jline-0.9.94.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/jline-0.9.94.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/slf4j-api-1.5.8.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/slf4j-api-1.5.8.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/log4j-1.2.14.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/log4j-1.2.14.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/ivy-2.1.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/ivy-2.1.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/ivy.xml
===================================================================
--- cassandra-handler/ivy.xml (revision 0)
+++ cassandra-handler/ivy.xml (revision 0)
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
Index: cassandra-handler/src/test/results/cassandra_queries.q.out
===================================================================
--- cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0)
+++ cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0)
@@ -0,0 +1,193 @@
+PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" ,
+"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1"
+)
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1")
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" ,
+"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1"
+)
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@cassandra_keyspace1_standard1
+PREHOOK: query: describe cassandra_keyspace1_standard1
+PREHOOK: type: DESCTABLE
+POSTHOOK: query: describe cassandra_keyspace1_standard1
+POSTHOOK: type: DESCTABLE
+key int from deserializer
+value string from deserializer
+PREHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB cassandra_keyspace1_standard1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 2) 0))))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Filter Operator
+ predicate:
+ expr: ((key % 2) = 0)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: ((key % 2) = 0)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: UDFToInteger(_col0)
+ type: int
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraTableInputFormat
+ output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat
+ serde: org.apache.hadoop.hive.cassandra.CassandraSerDe
+ name: cassandra_keyspace1_standard1
+
+
+PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@cassandra_keyspace1_standard1
+POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@cassandra_keyspace1_standard1
+PREHOOK: query: SELECT * FROM cassandra_keyspace1_standard1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2010-08-05_08-07-16_191_666986318680807068/-mr-10000
+POSTHOOK: query: SELECT * FROM cassandra_keyspace1_standard1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2010-08-05_08-07-16_191_666986318680807068/-mr-10000
+168 val_168
+348 val_348
+138 val_138
+134 val_134
+252 val_252
+76 val_76
+180 val_180
+164 val_164
+384 val_384
+498 val_498
+158 val_158
+126 val_126
+310 val_310
+396 val_396
+100 val_100
+278 val_278
+196 val_196
+482 val_482
+266 val_266
+430 val_430
+392 val_392
+256 val_256
+44 val_44
+208 val_208
+80 val_80
+472 val_472
+98 val_98
+224 val_224
+484 val_484
+170 val_170
+64 val_64
+438 val_438
+218 val_218
+286 val_286
+292 val_292
+466 val_466
+360 val_360
+400 val_400
+446 val_446
+242 val_242
+454 val_454
+494 val_494
+34 val_34
+156 val_156
+24 val_24
+172 val_172
+436 val_436
+432 val_432
+120 val_120
+470 val_470
+96 val_96
+274 val_274
+298 val_298
+394 val_394
+10 val_10
+296 val_296
+418 val_418
+458 val_458
+478 val_478
+190 val_190
+0 val_0
+368 val_368
+66 val_66
+72 val_72
+28 val_28
+30 val_30
+214 val_214
+78 val_78
+8 val_8
+104 val_104
+200 val_200
+262 val_262
+152 val_152
+2 val_2
+176 val_176
+386 val_386
+116 val_116
+490 val_490
+362 val_362
+424 val_424
+12 val_12
+332 val_332
+316 val_316
+174 val_174
+136 val_136
+318 val_318
+222 val_222
+364 val_364
+216 val_216
+284 val_284
+288 val_288
+160 val_160
+496 val_496
+344 val_344
+306 val_306
+26 val_26
+404 val_404
+382 val_382
+258 val_258
+462 val_462
+374 val_374
+74 val_74
+238 val_238
+444 val_444
+492 val_492
Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java
===================================================================
--- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0)
+++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0)
@@ -0,0 +1,82 @@
+package org.apache.cassandra.contrib.utils.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * A cleanup utility that wipes the cassandra data directories.
+ *
+ * @author Ran Tavory (rantav@gmail.com)
+ *
+ */
+public class CassandraServiceDataCleaner {
+
+ /**
+ * Creates all data dir if they don't exist and cleans them
+ * @throws IOException
+ */
+ public void prepare() throws IOException {
+ makeDirsIfNotExist();
+ cleanupDataDirectories();
+ }
+
+ /**
+ * Deletes all data from cassandra data directories, including the commit log.
+ * @throws IOException in case of permissions error etc.
+ */
+ public void cleanupDataDirectories() throws IOException {
+ for (String s: getDataDirs()) {
+ cleanDir(s);
+ }
+ }
+ /**
+ * Creates the data diurectories, if they didn't exist.
+ * @throws IOException if directories cannot be created (permissions etc).
+ */
+ public void makeDirsIfNotExist() throws IOException {
+ for (String s: getDataDirs()) {
+ mkdir(s);
+ }
+ }
+
+ /**
+ * Collects all data dirs and returns a set of String paths on the file system.
+ *
+ * @return
+ */
+ private Set getDataDirs() {
+ Set dirs = new HashSet();
+ for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+ dirs.add(s);
+ }
+ dirs.add(DatabaseDescriptor.getLogFileLocation());
+ return dirs;
+ }
+ /**
+ * Creates a directory
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void mkdir(String dir) throws IOException {
+ FileUtils.createDirectory(dir);
+ }
+
+ /**
+ * Removes all directory content from file the system
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void cleanDir(String dir) throws IOException {
+ File dirFile = new File(dir);
+ if (dirFile.exists() && dirFile.isDirectory()) {
+ FileUtils.delete(dirFile.listFiles());
+ }
+ }
+}
\ No newline at end of file
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0)
@@ -0,0 +1,59 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+
+import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.transport.TTransportException;
+
+public class CassandraTestSetup extends TestSetup{
+
+ static final Log LOG = LogFactory.getLog(CassandraTestSetup.class);
+
+ private EmbeddedCassandraService cassandra;
+
+ public CassandraTestSetup(Test test){
+ super(test);
+ }
+
+ @SuppressWarnings("deprecation")
+ void preTest(HiveConf conf) throws IOException, TTransportException{
+ if (cassandra ==null){
+ //System.setProperty("storage-config", System.getProperty("user.dir")+"/src/test/resources" );
+ LOG.debug("storage config"+System.getProperty("cassandra.resource.dir"));
+ System.setProperty("storage-config", System.getProperty("cassandra.resource.dir") );
+
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.init();
+ Thread t = new Thread(cassandra);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ String auxJars = conf.getAuxJars();
+ auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://"
+ + new JobConf(conf, Cassandra.Client.class).getJar();
+ auxJars += ",file://" + new JobConf(conf, CassandraSerDe.class).getJar();
+ auxJars += ",file:///home/edward/cassandra-handler/cassandra-handler/lib/libthrift-r917130.jar";
+ conf.setAuxJars(auxJars);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ //do we need this?
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ }
+
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0)
@@ -0,0 +1,56 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cassandra.output.CassandraColumn;
+import org.apache.hadoop.hive.cassandra.output.CassandraPut;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+public class TestCassandraPut extends TestCase{
+
+ private JobConf jobConf;
+ private FileSystem fs;
+ private String tmpRoot;
+ private Path tmpRootPath;
+
+ public void testCassandraPut() throws IOException{
+ jobConf= new JobConf(TestCassandraPut.class);
+ fs = FileSystem.get(jobConf);
+ tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis();
+ tmpRootPath = new Path(tmpRoot);
+ if (fs.exists(tmpRootPath)){
+ fs.delete(tmpRootPath, true);
+ }
+ fs.mkdirs( new Path(tmpRoot) );
+ Path sequenceFile = new Path(tmpRootPath,"afile");
+ CassandraPut cp = new CassandraPut("mykey");
+ CassandraColumn cc = new CassandraColumn();
+ cc.setColumnFamily("Standard1");
+ cc.setColumn("firstname".getBytes());
+ cc.setTimeStamp(System.currentTimeMillis());
+ cc.setValue("edward".getBytes());
+ cp.getColumns().add( cc );
+
+ SequenceFile.Writer writer = SequenceFile.createWriter
+ (fs, jobConf, sequenceFile, Text.class, CassandraPut.class ,
+ SequenceFile.CompressionType.BLOCK ,new org.apache.hadoop.io.compress.DefaultCodec());
+
+ writer.append(new Text("edward"),cp );
+ writer.close();
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+ Text key = new Text();
+ CassandraPut value= new CassandraPut();
+ reader.next(key, value);
+ assertEquals(new Text("edward"),key);
+
+ assertEquals( new String( cp.getColumns().get(0).getColumnFamily() ),
+ new String( "Standard1" ) );
+ }
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0)
@@ -0,0 +1,129 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+import java.util.GregorianCalendar;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cassandra.input.HiveIColumn;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class TestColumnFamilyInputFormat extends TestCase {
+
+ private JobConf jobConf;
+ private FileSystem fs;
+ private String tmpRoot;
+ private Path tmpRootPath;
+
+ private static TreeMap> mrResults = new TreeMap>();
+
+ @SuppressWarnings("deprecation")
+ public void testIf() throws Exception {
+
+ EmbeddedCassandraService cassandra;
+ System.setProperty("storage-config", "/home/edward/encrypt/cassandra-cleanup/trunk/src/test/resources/");
+
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.init();
+ Thread t = new Thread(cassandra);
+ t.setDaemon(true);
+ t.start();
+
+ GregorianCalendar gc = new GregorianCalendar();
+ long timestamp = gc.getTimeInMillis();
+ Cassandra.Client client = getClient();
+ ColumnPath cp = new ColumnPath("Standard1");
+ cp.setColumn("first".getBytes("utf-8"));
+
+ client.insert("Keyspace1", "ecapriolo", cp, "Edward".getBytes("UTF-8"),
+ timestamp, ConsistencyLevel.ONE);
+
+ client.insert("Keyspace1", "2", cp, "2".getBytes("UTF-8"),
+ timestamp, ConsistencyLevel.ONE);
+
+ ColumnOrSuperColumn theColumn = client.get("Keyspace1", "ecapriolo", cp, ConsistencyLevel.ONE);
+ assertEquals("Edward", new String(theColumn.column.value,"UTF-8"));
+
+ jobConf= new JobConf(TestColumnFamilyInputFormat.class);
+ jobConf.set(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, "Keyspace1");
+ jobConf.set(CassandraSerDe.CASSANDRA_CF_NAME, "Standard1");
+ jobConf.set(CassandraSerDe.CASSANDRA_HOST, "localhost");
+ jobConf.set(CassandraSerDe.CASSANDRA_PORT,"9170");
+ jobConf.set(CassandraSerDe.CASSANDRA_COL_MAPPING, "Standard1:astring" );
+
+ fs = FileSystem.get(jobConf);
+ tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis();
+ tmpRootPath = new Path(tmpRoot);
+ if (fs.exists(tmpRootPath)){
+ fs.delete(tmpRootPath, true);
+ }
+ fs.mkdirs( new Path(tmpRoot) );
+ Path aFolder = new Path(tmpRootPath,"afolder");
+ fs.mkdirs( aFolder );
+ fs.mkdirs( new Path(tmpRootPath,"bfolder"));
+
+ jobConf.setMapperClass(MapClass.class);
+ jobConf.setNumReduceTasks(0);
+ jobConf.setInputFormat(org.apache.hadoop.hive.cassandra.input.HiveCassandraTableInputFormat.class);
+ jobConf.setOutputFormat(org.apache.hadoop.mapred.lib.NullOutputFormat.class );
+ JobClient.runJob(jobConf);
+
+ assert(mrResults.containsKey("ecapriolo"));
+ assert(mrResults.containsKey("2"));
+ TreeMap aRow = new TreeMap();
+ aRow.put("first", "Edward");
+ assertEquals( aRow, mrResults.get("ecapriolo"));
+ }
+
+ static class MapClass extends MapReduceBase implements Mapper{
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void map(Text key, MapWritable value, OutputCollector out, Reporter rep)
+ throws IOException {
+ for (Map.Entry entry: value.entrySet()){
+ String eKey= new String( ((BytesWritable) entry.getKey()).getBytes() );
+ String eVal = new String( ((HiveIColumn) entry.getValue()).getValue() );
+ TreeMap row = new TreeMap();
+ row.put(eKey,eVal);
+ mrResults.put(key.toString(), row);
+ }
+ }
+
+ }
+
+
+ private Cassandra.Client getClient() throws TTransportException {
+ TTransport tr = new TSocket("localhost", 9170);
+ TProtocol proto = new TBinaryProtocol(tr);
+ Cassandra.Client client = new Cassandra.Client(proto);
+ tr.open();
+ return client;
+ }
+
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0)
@@ -0,0 +1,23 @@
+package org.apache.hadoop.hive.cassandra;
+
+import org.apache.hadoop.hive.ql.QTestUtil;
+
+/**
+ * HBaseQTestUtil initializes HBase-specific test fixtures.
+ */
+public class CassandraQTestUtil extends QTestUtil {
+ public CassandraQTestUtil(
+ String outDir, String logDir, boolean miniMr, CassandraTestSetup setup)
+ throws Exception {
+
+ super(outDir, logDir, miniMr, null);
+ setup.preTest(conf);
+ super.init();
+ }
+
+ @Override
+ public void init() throws Exception {
+ // defer
+ }
+}
+
Index: cassandra-handler/src/test/resources/storage-conf.xml
===================================================================
--- cassandra-handler/src/test/resources/storage-conf.xml (revision 0)
+++ cassandra-handler/src/test/resources/storage-conf.xml (revision 0)
@@ -0,0 +1,380 @@
+
+
+
+
+
+
+
+ Test Cluster
+
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.cassandra.locator.RackUnawareStrategy
+
+
+ 1
+
+
+ org.apache.cassandra.locator.EndPointSnitch
+
+
+
+
+ org.apache.cassandra.auth.AllowAllAuthenticator
+
+
+ org.apache.cassandra.dht.RandomPartitioner
+
+
+
+
+
+ /tmp/commitlog
+
+ /tmp/data
+
+
+
+
+
+ 127.0.0.1
+
+
+
+
+
+
+ 10000
+
+ 128
+
+
+
+
+
+ localhost
+
+ 7001
+
+
+ 0.0.0.0
+
+ 9170
+
+ false
+
+
+
+
+
+
+
+ auto
+
+
+ 512
+
+
+ 64
+
+
+ 32
+ 8
+
+
+ 64
+
+
+ 64
+
+ 256
+
+ 0.3
+
+ 60
+
+
+ 8
+ 32
+
+
+ periodic
+
+ 10000
+
+
+
+
+ 864000
+
Index: cassandra-handler/src/test/resources/log4j-tools.properties
===================================================================
--- cassandra-handler/src/test/resources/log4j-tools.properties (revision 0)
+++ cassandra-handler/src/test/resources/log4j-tools.properties (revision 0)
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set the root to INFO
+# and the pattern to %c instead of %l. (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=WARN,stderr
+
+# stderr
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.target=System.err
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
Index: cassandra-handler/src/test/resources/log4j.properties
===================================================================
--- cassandra-handler/src/test/resources/log4j.properties (revision 0)
+++ cassandra-handler/src/test/resources/log4j.properties (revision 0)
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set the root to INFO
+# and the pattern to %c instead of %l. (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=INFO,stdout,R
+
+# stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
+
+# rolling log file
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.file.maxFileSize=20MB
+log4j.appender.file.maxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
+# Edit the next line to point to your logs directory
+log4j.appender.R.File=/tmp/system.log
+
+# Application logging options
+#log4j.logger.com.facebook=DEBUG
+#log4j.logger.com.facebook.infrastructure.gms=DEBUG
+#log4j.logger.com.facebook.infrastructure.db=DEBUG
Index: cassandra-handler/src/test/resources/passwd.properties
===================================================================
--- cassandra-handler/src/test/resources/passwd.properties (revision 0)
+++ cassandra-handler/src/test/resources/passwd.properties (revision 0)
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This is a sample password file for SimpleAuthenticator. The format of
+# this file is username=password. If -Dpasswd.mode=MD5 then the password
+# is represented as an md5 digest, otherwise it is cleartext (keep this
+# in mind when setting file mode and ownership).
+jsmith=havebadpass
+Elvis\ Presley=graceland4evar
+dilbert=nomoovertime
Index: cassandra-handler/src/test/resources/access.properties
===================================================================
--- cassandra-handler/src/test/resources/access.properties (revision 0)
+++ cassandra-handler/src/test/resources/access.properties (revision 0)
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This is a sample access file for SimpleAuthenticator. The format of
+# this file is keyspace=users, where users is a comma delimited list of
+# authenticatable users from passwd.properties. This file contains
+# potentially sensitive information, keep this in mind when setting its
+# mode and ownership.
+Keyspace1=jsmith,Elvis Presley,dilbert
Index: cassandra-handler/src/test/queries/cassandra_queries.q
===================================================================
--- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0)
+++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0)
@@ -0,0 +1,17 @@
+
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" ,
+"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1"
+)
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1");
+
+describe cassandra_keyspace1_standard1;
+
+EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0;
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0;
+
+SELECT * FROM cassandra_keyspace1_standard1;
Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm
===================================================================
--- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0)
+++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0)
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hive.cli;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.hive.cassandra.CassandraQTestUtil;
+import org.apache.hadoop.hive.cassandra.CassandraTestSetup;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.history.HiveHistoryViewer;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.antlr.runtime.*;
+import org.antlr.runtime.tree.*;
+
+public class $className extends TestCase {
+
+ private CassandraQTestUtil qt;
+ private CassandraTestSetup setup;
+
+ public $className(String name, CassandraTestSetup setup) {
+ super(name);
+ qt = null;
+ this.setup = setup;
+ }
+
+ @Override
+ protected void setUp() {
+ try {
+ boolean miniMR = false;
+ if ("$clusterMode".equals("miniMR")) {
+ miniMR = true;
+ }
+
+ qt = new CassandraQTestUtil(
+ "$resultsDir.getCanonicalPath()",
+ "$logDir.getCanonicalPath()", miniMR, setup);
+
+#foreach ($qf in $qfiles)
+ qt.addFile("$qf.getCanonicalPath()");
+#end
+ } catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception in setup");
+ }
+ }
+
+ @Override
+ protected void tearDown() {
+ try {
+ qt.shutdown();
+ }
+ catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception in tearDown");
+ }
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ CassandraTestSetup setup = new CassandraTestSetup(suite);
+#foreach ($qf in $qfiles)
+ #set ($fname = $qf.getName())
+ #set ($eidx = $fname.length() - 2)
+ #set ($tname = $fname.substring(0, $eidx))
+ suite.addTest(new $className("testCliDriver_$tname", setup));
+#end
+ return setup;
+ }
+
+ #foreach ($qf in $qfiles)
+ #set ($fname = $qf.getName())
+ #set ($eidx = $fname.length() - 2)
+ #set ($tname = $fname.substring(0, $eidx))
+ public void testCliDriver_$tname() throws Exception {
+ try {
+ System.out.println("Begin query: " + "$fname");
+ qt.cliInit("$fname");
+ int ecode = qt.executeClient("$fname");
+ if (ecode != 0) {
+ fail("Client Execution failed with error code = " + ecode);
+ }
+ if (SessionState.get() != null) {
+ HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+ .getHiveHistory().getHistFileName());
+ Map jobInfoMap = hv.getJobInfoMap();
+ Map taskInfoMap = hv.getTaskInfoMap();
+
+ if (jobInfoMap.size() != 0) {
+ String cmd = (String)jobInfoMap.keySet().toArray()[0];
+ QueryInfo ji = jobInfoMap.get(cmd);
+
+ if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) {
+ fail("Wrong return code in hive history");
+ }
+ }
+ }
+
+ ecode = qt.checkCliDriverResults("$fname");
+ if (ecode != 0) {
+ fail("Client execution results failed with error code = " + ecode);
+ }
+ } catch (Throwable e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception");
+ }
+
+ System.out.println("Done query: " + "$fname");
+ assertTrue("Test passed", true);
+ }
+
+#end
+}
\ No newline at end of file
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0)
@@ -0,0 +1,165 @@
+package org.apache.hadoop.hive.cassandra.input;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.Collection;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.hadoop.io.Writable;
+
+public class HiveIColumn implements IColumn, Writable {
+
+ private byte [] name;
+ private byte [] value;
+ private long timestamp;
+
+ public HiveIColumn(){
+
+ }
+
+ @Override
+ public byte[] name() {
+ return name;
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public byte[] value() {
+ return value;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ name = new byte[in.readInt()];
+ in.readFully(name);
+
+ value= new byte[in.readInt()];
+ in.readFully(value);
+
+ timestamp = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(name.length);
+ out.write(name);
+
+ out.writeInt(value.length);
+ out.write(value);
+
+ out.writeLong(timestamp);
+ }
+
+ //bean patterns
+
+ public byte[] getName() {
+ return name;
+ }
+
+ public void setName(byte[] name) {
+ this.name = name;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ //not needed for current integration
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addColumn(IColumn arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IColumn diff(IColumn arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getLocalDeletionTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMarkedForDeleteAt() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getObjectCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getString(AbstractType arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IColumn getSubColumn(byte[] arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection getSubColumns() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isMarkedForDelete() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long mostRecentLiveChangeAt() {
+ throw new UnsupportedOperationException();
+ }
+
+
+
+ @Override
+ public int serializedSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void updateDigest(MessageDigest arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public String toString(){
+ StringBuffer sb = new StringBuffer();
+ sb.append("HiveIColumn[");
+ sb.append("name "+new String(this.name)+" ");
+ sb.append("value "+new String(this.value)+" ");
+ sb.append("timestamp "+ this.timestamp+" ");
+ return sb.toString();
+ }
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraRowResult.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraRowResult.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/CassandraRowResult.java (revision 0)
@@ -0,0 +1,63 @@
+package org.apache.hadoop.hive.cassandra.input;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+
+
+public class CassandraRowResult implements Writable {
+
+ private Text key;
+ private MapWritable value;
+
+ public CassandraRowResult() {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ key =new Text();
+ key.readFields(in);
+ value = new MapWritable();
+ value.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ key.write(out);
+ value.write(out);
+ }
+
+ public Text getKey() {
+ return key;
+ }
+
+ public void setKey(Text key) {
+ this.key = key;
+ }
+
+ public MapWritable getValue() {
+ return value;
+ }
+
+ public void setValue(MapWritable value) {
+ this.value = value;
+ }
+
+
+ @Override
+ public String toString(){
+ StringBuffer sb = new StringBuffer();
+ sb.append("RowResult key:"+key );
+ for (Map.Entry entry : value.entrySet()){
+ sb.append( "entry key:"+entry.getKey()+" " );
+ sb.append( "entry value:"+entry.getValue()+" " );
+ }
+ return sb.toString();
+ }
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0)
@@ -0,0 +1,129 @@
+package org.apache.hadoop.hive.cassandra.input;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.cassandra.CassandraSerDe;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
+public class LazyCassandraRow extends LazyStruct{
+ private List cassandraColumns;
+ private CassandraRowResult rowResult;
+ private List cassandraColumnBytes;
+ private ArrayList