Index: build.xml
===================================================================
--- build.xml (revision 981263)
+++ build.xml (working copy)
@@ -53,11 +53,20 @@
+
+
+
+
+
+
+
+
+
@@ -94,7 +103,7 @@
-
+
@@ -105,7 +114,7 @@
-
+
@@ -303,6 +312,7 @@
+
Index: cassandra-handler/lib/clhm-production.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/clhm-production.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/high-scale-lib.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/high-scale-lib.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/libthrift-r917130.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/libthrift-r917130.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-cli-1.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-cli-1.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/avro-1.2.0-dev.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/avro-1.2.0-dev.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/apache-cassandra-0.6.3.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/apache-cassandra-0.6.3.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/antlr-3.1.3.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/antlr-3.1.3.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-codec-1.2.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-codec-1.2.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/google-collections-1.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/google-collections-1.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-collections-3.2.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-collections-3.2.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/jackson-core-asl-1.4.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/jackson-core-asl-1.4.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/commons-lang-2.4.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/commons-lang-2.4.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/json-simple-1.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/json-simple-1.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/jline-0.9.94.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/jline-0.9.94.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/slf4j-api-1.5.8.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/slf4j-api-1.5.8.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/log4j-1.2.14.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/log4j-1.2.14.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/lib/ivy-2.1.0.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: cassandra-handler/lib/ivy-2.1.0.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: cassandra-handler/ivy.xml
===================================================================
--- cassandra-handler/ivy.xml (revision 0)
+++ cassandra-handler/ivy.xml (revision 0)
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java
===================================================================
--- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0)
+++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0)
@@ -0,0 +1,82 @@
+package org.apache.cassandra.contrib.utils.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * A cleanup utility that wipes the cassandra data directories.
+ *
+ * @author Ran Tavory (rantav@gmail.com)
+ *
+ */
+public class CassandraServiceDataCleaner {
+
+ /**
+ * Creates all data dir if they don't exist and cleans them
+ * @throws IOException
+ */
+ public void prepare() throws IOException {
+ makeDirsIfNotExist();
+ cleanupDataDirectories();
+ }
+
+ /**
+ * Deletes all data from cassandra data directories, including the commit log.
+ * @throws IOException in case of permissions error etc.
+ */
+ public void cleanupDataDirectories() throws IOException {
+ for (String s: getDataDirs()) {
+ cleanDir(s);
+ }
+ }
+ /**
+ * Creates the data diurectories, if they didn't exist.
+ * @throws IOException if directories cannot be created (permissions etc).
+ */
+ public void makeDirsIfNotExist() throws IOException {
+ for (String s: getDataDirs()) {
+ mkdir(s);
+ }
+ }
+
+ /**
+ * Collects all data dirs and returns a set of String paths on the file system.
+ *
+ * @return
+ */
+ private Set getDataDirs() {
+ Set dirs = new HashSet();
+ for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+ dirs.add(s);
+ }
+ dirs.add(DatabaseDescriptor.getLogFileLocation());
+ return dirs;
+ }
+ /**
+ * Creates a directory
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void mkdir(String dir) throws IOException {
+ FileUtils.createDirectory(dir);
+ }
+
+ /**
+ * Removes all directory content from file the system
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void cleanDir(String dir) throws IOException {
+ File dirFile = new File(dir);
+ if (dirFile.exists() && dirFile.isDirectory()) {
+ FileUtils.delete(dirFile.listFiles());
+ }
+ }
+}
\ No newline at end of file
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0)
@@ -0,0 +1,58 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+
+import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.transport.TTransportException;
+
+public class CassandraTestSetup extends TestSetup{
+
+ static final Log LOG = LogFactory.getLog(CassandraTestSetup.class);
+
+ private EmbeddedCassandraService cassandra;
+
+ public CassandraTestSetup(Test test){
+ super(test);
+ }
+
+ @SuppressWarnings("deprecation")
+ void preTest(HiveConf conf) throws IOException, TTransportException{
+ if (cassandra ==null){
+ //System.setProperty("storage-config", System.getProperty("user.dir")+"/src/test/resources" );
+ LOG.debug("storage config"+System.getProperty("cassandra.resource.dir"));
+ System.setProperty("storage-config", System.getProperty("cassandra.resource.dir") );
+
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.init();
+ Thread t = new Thread(cassandra);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ String auxJars = conf.getAuxJars();
+ auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://"
+ + new JobConf(conf, Cassandra.Client.class).getJar();
+ auxJars += ",file://" + new JobConf(conf, CassandraSerDe.class).getJar();
+ conf.setAuxJars(auxJars);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ //do we need this?
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ }
+
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0)
@@ -0,0 +1,56 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cassandra.output.CassandraColumn;
+import org.apache.hadoop.hive.cassandra.output.CassandraPut;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+public class TestCassandraPut extends TestCase{
+
+ private JobConf jobConf;
+ private FileSystem fs;
+ private String tmpRoot;
+ private Path tmpRootPath;
+
+ public void testCassandraPut() throws IOException{
+ jobConf= new JobConf(TestCassandraPut.class);
+ fs = FileSystem.get(jobConf);
+ tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis();
+ tmpRootPath = new Path(tmpRoot);
+ if (fs.exists(tmpRootPath)){
+ fs.delete(tmpRootPath, true);
+ }
+ fs.mkdirs( new Path(tmpRoot) );
+ Path sequenceFile = new Path(tmpRootPath,"afile");
+ CassandraPut cp = new CassandraPut("mykey");
+ CassandraColumn cc = new CassandraColumn();
+ cc.setColumnFamily("Standard1");
+ cc.setColumn("firstname".getBytes());
+ cc.setTimeStamp(System.currentTimeMillis());
+ cc.setValue("edward".getBytes());
+ cp.getColumns().add( cc );
+
+ SequenceFile.Writer writer = SequenceFile.createWriter
+ (fs, jobConf, sequenceFile, Text.class, CassandraPut.class ,
+ SequenceFile.CompressionType.BLOCK ,new org.apache.hadoop.io.compress.DefaultCodec());
+
+ writer.append(new Text("edward"),cp );
+ writer.close();
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+ Text key = new Text();
+ CassandraPut value= new CassandraPut();
+ reader.next(key, value);
+ assertEquals(new Text("edward"),key);
+
+ assertEquals( new String( cp.getColumns().get(0).getColumnFamily() ),
+ new String( "Standard1" ) );
+ }
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0)
@@ -0,0 +1,128 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+import java.util.GregorianCalendar;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class TestColumnFamilyInputFormat extends TestCase {
+
+ private JobConf jobConf;
+ private FileSystem fs;
+ private String tmpRoot;
+ private Path tmpRootPath;
+
+ private static TreeMap> mrResults = new TreeMap>();
+
+ @SuppressWarnings("deprecation")
+ public void testIf() throws Exception {
+
+ EmbeddedCassandraService cassandra;
+ System.setProperty("storage-config", "/home/edward/encrypt/cassandra-cleanup/trunk/src/test/resources/");
+
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.init();
+ Thread t = new Thread(cassandra);
+ t.setDaemon(true);
+ t.start();
+
+ GregorianCalendar gc = new GregorianCalendar();
+ long timestamp = gc.getTimeInMillis();
+ Cassandra.Client client = getClient();
+ ColumnPath cp = new ColumnPath("Standard1");
+ cp.setColumn("first".getBytes("utf-8"));
+
+ client.insert("Keyspace1", "ecapriolo", cp, "Edward".getBytes("UTF-8"),
+ timestamp, ConsistencyLevel.ONE);
+
+ client.insert("Keyspace1", "2", cp, "2".getBytes("UTF-8"),
+ timestamp, ConsistencyLevel.ONE);
+
+ ColumnOrSuperColumn theColumn = client.get("Keyspace1", "ecapriolo", cp, ConsistencyLevel.ONE);
+ assertEquals("Edward", new String(theColumn.column.value,"UTF-8"));
+
+ jobConf= new JobConf(TestColumnFamilyInputFormat.class);
+ jobConf.set(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, "Keyspace1");
+ jobConf.set(CassandraSerDe.CASSANDRA_CF_NAME, "Standard1");
+ jobConf.set(CassandraSerDe.CASSANDRA_HOST, "localhost");
+ jobConf.set(CassandraSerDe.CASSANDRA_PORT,"9170");
+ jobConf.set(CassandraSerDe.CASSANDRA_COL_MAPPING, "Standard1:astring" );
+
+ fs = FileSystem.get(jobConf);
+ tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis();
+ tmpRootPath = new Path(tmpRoot);
+ if (fs.exists(tmpRootPath)){
+ fs.delete(tmpRootPath, true);
+ }
+ fs.mkdirs( new Path(tmpRoot) );
+ Path aFolder = new Path(tmpRootPath,"afolder");
+ fs.mkdirs( aFolder );
+ fs.mkdirs( new Path(tmpRootPath,"bfolder"));
+
+ jobConf.setMapperClass(MapClass.class);
+ jobConf.setNumReduceTasks(0);
+ jobConf.setInputFormat(org.apache.hadoop.hive.cassandra.HiveCassandraTableInputFormat.class);
+ jobConf.setOutputFormat(org.apache.hadoop.mapred.lib.NullOutputFormat.class );
+ JobClient.runJob(jobConf);
+
+ assert(mrResults.containsKey("ecapriolo"));
+ assert(mrResults.containsKey("2"));
+ TreeMap aRow = new TreeMap();
+ aRow.put("first", "Edward");
+ assertEquals( aRow, mrResults.get("ecapriolo"));
+ }
+
+ static class MapClass extends MapReduceBase implements Mapper{
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void map(Text key, MapWritable value, OutputCollector out, Reporter rep)
+ throws IOException {
+ for (Map.Entry entry: value.entrySet()){
+ String eKey= new String( ((BytesWritable) entry.getKey()).getBytes() );
+ String eVal = new String( ((BytesWritable) entry.getValue()).getBytes() );
+ TreeMap row = new TreeMap();
+ row.put(eKey,eVal);
+ mrResults.put(key.toString(), row);
+ }
+ }
+
+ }
+
+
+ private Cassandra.Client getClient() throws TTransportException {
+ TTransport tr = new TSocket("localhost", 9170);
+ TProtocol proto = new TBinaryProtocol(tr);
+ Cassandra.Client client = new Cassandra.Client(proto);
+ tr.open();
+ return client;
+ }
+
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0)
@@ -0,0 +1,23 @@
+package org.apache.hadoop.hive.cassandra;
+
+import org.apache.hadoop.hive.ql.QTestUtil;
+
+/**
+ * HBaseQTestUtil initializes HBase-specific test fixtures.
+ */
+public class CassandraQTestUtil extends QTestUtil {
+ public CassandraQTestUtil(
+ String outDir, String logDir, boolean miniMr, CassandraTestSetup setup)
+ throws Exception {
+
+ super(outDir, logDir, miniMr, null);
+ setup.preTest(conf);
+ super.init();
+ }
+
+ @Override
+ public void init() throws Exception {
+ // defer
+ }
+}
+
Index: cassandra-handler/src/test/resources/storage-conf.xml
===================================================================
--- cassandra-handler/src/test/resources/storage-conf.xml (revision 0)
+++ cassandra-handler/src/test/resources/storage-conf.xml (revision 0)
@@ -0,0 +1,380 @@
+
+
+
+
+
+
+
+ Test Cluster
+
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.cassandra.locator.RackUnawareStrategy
+
+
+ 1
+
+
+ org.apache.cassandra.locator.EndPointSnitch
+
+
+
+
+ org.apache.cassandra.auth.AllowAllAuthenticator
+
+
+ org.apache.cassandra.dht.RandomPartitioner
+
+
+
+
+
+ /tmp/commitlog
+
+ /tmp/data
+
+
+
+
+
+ 127.0.0.1
+
+
+
+
+
+
+ 10000
+
+ 128
+
+
+
+
+
+ localhost
+
+ 7001
+
+
+ 0.0.0.0
+
+ 9170
+
+ false
+
+
+
+
+
+
+
+ auto
+
+
+ 512
+
+
+ 64
+
+
+ 32
+ 8
+
+
+ 64
+
+
+ 64
+
+ 256
+
+ 0.3
+
+ 60
+
+
+ 8
+ 32
+
+
+ periodic
+
+ 10000
+
+
+
+
+ 864000
+
Index: cassandra-handler/src/test/resources/log4j-tools.properties
===================================================================
--- cassandra-handler/src/test/resources/log4j-tools.properties (revision 0)
+++ cassandra-handler/src/test/resources/log4j-tools.properties (revision 0)
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set the root to INFO
+# and the pattern to %c instead of %l. (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=WARN,stderr
+
+# stderr
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.target=System.err
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
Index: cassandra-handler/src/test/resources/log4j.properties
===================================================================
--- cassandra-handler/src/test/resources/log4j.properties (revision 0)
+++ cassandra-handler/src/test/resources/log4j.properties (revision 0)
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set the root to INFO
+# and the pattern to %c instead of %l. (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=INFO,stdout,R
+
+# stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
+
+# rolling log file
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.file.maxFileSize=20MB
+log4j.appender.file.maxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
+# Edit the next line to point to your logs directory
+log4j.appender.R.File=/tmp/system.log
+
+# Application logging options
+#log4j.logger.com.facebook=DEBUG
+#log4j.logger.com.facebook.infrastructure.gms=DEBUG
+#log4j.logger.com.facebook.infrastructure.db=DEBUG
Index: cassandra-handler/src/test/resources/passwd.properties
===================================================================
--- cassandra-handler/src/test/resources/passwd.properties (revision 0)
+++ cassandra-handler/src/test/resources/passwd.properties (revision 0)
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This is a sample password file for SimpleAuthenticator. The format of
+# this file is username=password. If -Dpasswd.mode=MD5 then the password
+# is represented as an md5 digest, otherwise it is cleartext (keep this
+# in mind when setting file mode and ownership).
+jsmith=havebadpass
+Elvis\ Presley=graceland4evar
+dilbert=nomoovertime
Index: cassandra-handler/src/test/resources/access.properties
===================================================================
--- cassandra-handler/src/test/resources/access.properties (revision 0)
+++ cassandra-handler/src/test/resources/access.properties (revision 0)
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This is a sample access file for SimpleAuthenticator. The format of
+# this file is keyspace=users, where users is a comma delimited list of
+# authenticatable users from passwd.properties. This file contains
+# potentially sensitive information, keep this in mind when setting its
+# mode and ownership.
+Keyspace1=jsmith,Elvis Presley,dilbert
Index: cassandra-handler/src/test/queries/cassandra_udfs.q
===================================================================
Index: cassandra-handler/src/test/queries/cassandra_queries.q
===================================================================
--- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0)
+++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0)
@@ -0,0 +1,12 @@
+CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" ,
+"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1"
+)
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1");
+
+describe cassandra_keyspace1_standard1;
+
+EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0;
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0;
Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm
===================================================================
--- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0)
+++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0)
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hive.cli;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.hive.cassandra.CassandraQTestUtil;
+import org.apache.hadoop.hive.cassandra.CassandraTestSetup;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.history.HiveHistoryViewer;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.antlr.runtime.*;
+import org.antlr.runtime.tree.*;
+
+public class $className extends TestCase {
+
+ private CassandraQTestUtil qt;
+ private CassandraTestSetup setup;
+
+ public $className(String name, CassandraTestSetup setup) {
+ super(name);
+ qt = null;
+ this.setup = setup;
+ }
+
+ @Override
+ protected void setUp() {
+ try {
+ boolean miniMR = false;
+ if ("$clusterMode".equals("miniMR")) {
+ miniMR = true;
+ }
+
+ qt = new CassandraQTestUtil(
+ "$resultsDir.getCanonicalPath()",
+ "$logDir.getCanonicalPath()", miniMR, setup);
+
+#foreach ($qf in $qfiles)
+ qt.addFile("$qf.getCanonicalPath()");
+#end
+ } catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception in setup");
+ }
+ }
+
+ @Override
+ protected void tearDown() {
+ try {
+ qt.shutdown();
+ }
+ catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception in tearDown");
+ }
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ CassandraTestSetup setup = new CassandraTestSetup(suite);
+#foreach ($qf in $qfiles)
+ #set ($fname = $qf.getName())
+ #set ($eidx = $fname.length() - 2)
+ #set ($tname = $fname.substring(0, $eidx))
+ suite.addTest(new $className("testCliDriver_$tname", setup));
+#end
+ return setup;
+ }
+
+ #foreach ($qf in $qfiles)
+ #set ($fname = $qf.getName())
+ #set ($eidx = $fname.length() - 2)
+ #set ($tname = $fname.substring(0, $eidx))
+ public void testCliDriver_$tname() throws Exception {
+ try {
+ System.out.println("Begin query: " + "$fname");
+ qt.cliInit("$fname");
+ int ecode = qt.executeClient("$fname");
+ if (ecode != 0) {
+ fail("Client Execution failed with error code = " + ecode);
+ }
+ if (SessionState.get() != null) {
+ HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+ .getHiveHistory().getHistFileName());
+ Map jobInfoMap = hv.getJobInfoMap();
+ Map taskInfoMap = hv.getTaskInfoMap();
+
+ if (jobInfoMap.size() != 0) {
+ String cmd = (String)jobInfoMap.keySet().toArray()[0];
+ QueryInfo ji = jobInfoMap.get(cmd);
+
+ if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) {
+ fail("Wrong return code in hive history");
+ }
+ }
+ }
+
+ ecode = qt.checkCliDriverResults("$fname");
+ if (ecode != 0) {
+ fail("Client execution results failed with error code = " + ecode);
+ }
+ } catch (Throwable e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception");
+ }
+
+ System.out.println("Done query: " + "$fname");
+ assertTrue("Test passed", true);
+ }
+
+#end
+}
\ No newline at end of file
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0)
@@ -0,0 +1,122 @@
+package org.apache.hadoop.hive.cassandra.output;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cassandra.CassandraSerDe;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+
+
+public class HiveCassandraOutputFormat
+ implements HiveOutputFormat,
+ OutputFormat{
+
+
+
+ @Override
+ public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
+ Progressable progress) throws IOException {
+
+
+ final String cassandraKeySpace = jc.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ final String cassandraColumnFamily = jc.get(CassandraSerDe.CASSANDRA_CF_NAME);
+ final String cassandraHost = jc.get(CassandraSerDe.CASSANDRA_HOST);
+ final int cassandraPort = Integer.parseInt ( jc.get(CassandraSerDe.CASSANDRA_PORT) );
+
+ return new RecordWriter() {
+
+ private Cassandra.Client client;
+ private TTransport transport;
+ private TProtocol proto;
+ @Override
+ public void close(boolean abort) throws IOException {
+ //no going back
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ CassandraPut put = (CassandraPut) w;
+ ensureConnection(cassandraHost,cassandraPort,5000);
+ for (CassandraColumn c : put.getColumns() ){
+ ColumnPath cp = new ColumnPath();
+ cp.setColumn(c.getColumn() );
+ cp.setColumn_family(c.getColumnFamily() );
+
+ try {
+ client.insert(cassandraKeySpace, put.getKey(), cp, c.getValue(), c.getTimeStamp(), ConsistencyLevel.QUORUM);
+ } catch (InvalidRequestException e) {
+ throw new IOException(e);
+ } catch (UnavailableException e) {
+ throw new IOException(e);
+ } catch (TimedOutException e) {
+ throw new IOException(e);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void ensureConnection(String host,int port, int timeout){
+ if (transport == null || (transport!=null && !transport.isOpen())){
+ transport = new TSocket(host, port,timeout);
+ proto = new TBinaryProtocol(transport);
+ try {
+ transport.open();
+ this.client = new Cassandra.Client(proto);
+ } catch (TTransportException e) {
+ throw new RuntimeException (e);
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem arg0, JobConf jc) throws IOException {
+ /*
+ String hbaseTableName = jc.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
+ Job job = new Job(jc);
+ JobContext jobContext =
+ new JobContext(job.getConfiguration(), job.getJobID());
+
+ try {
+ checkOutputSpecs(jobContext);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+*/
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem arg0,
+ JobConf arg1, String arg2, Progressable arg3) throws IOException {
+ throw new RuntimeException("Error: Hive should not invoke this method.");
+
+ }
+
+
+
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0)
@@ -0,0 +1,87 @@
+package org.apache.hadoop.hive.cassandra.output;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class CassandraPut implements Writable{
+
+ private String key;
+ //private ConsistencyLevel level;
+ private List columns;
+
+ public CassandraPut(){
+ columns = new ArrayList();
+ }
+
+ public CassandraPut(String key){
+ this();
+ setKey(key);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ key = in.readUTF();
+ int ilevel = in.readInt();
+// level = ConsistencyLevel.QUORUM;
+ int cols = in.readInt();
+ for (int i =0;i getColumns() {
+ return columns;
+ }
+
+ public void setColumns(List columns) {
+ this.columns = columns;
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("key: ");
+ sb.append(this.key);
+ for (CassandraColumn col:this.columns){
+ sb.append( "column : [" );
+ sb.append( col.toString() );
+ sb.append( "]" );
+ }
+ return sb.toString();
+ }
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0)
@@ -0,0 +1,79 @@
+package org.apache.hadoop.hive.cassandra.output;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class CassandraColumn implements Writable{
+
+ private String columnFamily;
+ private long timeStamp;
+ private byte [] column;
+ private byte [] value;
+
+
+ @Override
+ public void readFields(DataInput din) throws IOException {
+ columnFamily = din.readUTF();
+ timeStamp = din.readLong();
+ int clength= din.readInt();
+ column = new byte[clength];
+ din.readFully(column, 0, clength);
+ int vlength = din.readInt();
+ value = new byte [vlength ];
+ din.readFully( value, 0 , vlength);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(columnFamily);
+ out.writeLong(timeStamp);
+ out.writeInt( column.length );
+ out.write(column);
+ out.writeInt( value.length );
+ out.write(value);
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public void setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+ public byte[] getColumn() {
+ return column;
+ }
+
+ public void setColumn(byte[] column) {
+ this.column = column;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append( "cf:"+ this.columnFamily);
+ sb.append( "column:"+ new String (this.column));
+ sb.append( "value:"+ new String (this.value));
+ return sb.toString();
+ }
+}
\ No newline at end of file
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0)
@@ -0,0 +1,166 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Constants;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook{
+
+ private TTransport transport;
+ private TProtocol proto;
+ private Cassandra.Client client;
+
+ private Configuration configuration;
+
+ @Override
+ public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) {
+
+ Properties tableProperties = tableDesc.getProperties();
+ String keyspaceName =
+ tableProperties.getProperty(CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ if (keyspaceName == null) {
+ keyspaceName = tableProperties.getProperty(Constants.META_TABLE_NAME);
+ }
+ jobProperties.put(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, keyspaceName);
+ jobProperties.put( CassandraSerDe.CASSANDRA_COL_MAPPING,
+ tableProperties.getProperty(CassandraSerDe.CASSANDRA_COL_MAPPING));
+ jobProperties.put( CassandraSerDe.CASSANDRA_CF_NAME,
+ tableProperties.getProperty(CassandraSerDe.CASSANDRA_CF_NAME));
+ jobProperties.put( CassandraSerDe.CASSANDRA_HOST,
+ tableProperties.getProperty(CassandraSerDe.CASSANDRA_HOST));
+ jobProperties.put( CassandraSerDe.CASSANDRA_PORT,
+ tableProperties.getProperty(CassandraSerDe.CASSANDRA_PORT));
+ }
+
+
+ @Override
+ public Class extends InputFormat> getInputFormatClass() {
+ return HiveCassandraTableInputFormat.class;
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return this;
+ }
+
+ @Override
+ public Class extends OutputFormat> getOutputFormatClass() {
+ return org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat.class;
+ }
+
+ @Override
+ public Class extends SerDe> getSerDeClass() {
+ return CassandraSerDe.class;
+ }
+
+
+ @Override
+ public Configuration getConf() {
+ return this.configuration;
+ }
+
+ @Override
+ public void setConf(Configuration arg0) {
+ this.configuration=arg0;
+ }
+
+ @Override
+ public void commitCreateTable(Table table) throws MetaException {
+ //No work needed
+ }
+
+ @Override
+ public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+ //No work needed
+ }
+
+ @Override
+ public void preCreateTable(Table tbl) throws MetaException {
+ boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
+ if (!isExternal) {
+ throw new MetaException("Cassandra tables must be external.");
+ }
+ if (tbl.getSd().getLocation() != null) {
+ throw new MetaException("LOCATION may not be specified for Cassandra.");
+ }
+
+ String keyspaceName = this.getCassandraKeyspaceName(tbl);
+ Map serdeParam = tbl.getSd().getSerdeInfo().getParameters();
+ String cassandraColumnStr = serdeParam.get(CassandraSerDe.CASSANDRA_COL_MAPPING);
+ if (cassandraColumnStr == null) {
+ throw new MetaException("No cassandra.columns.mapping defined in Serde.");
+ }
+ String cassandraHost = serdeParam.get(CassandraSerDe.CASSANDRA_HOST);
+ int cassandraPort = Integer.parseInt(serdeParam.get( CassandraSerDe.CASSANDRA_PORT) );
+ this.ensureConnection(cassandraHost, cassandraPort, 5000);
+
+ try {
+ if ( ! client.describe_keyspaces().contains(keyspaceName) ){
+ throw new MetaException( keyspaceName +" not found");
+ }
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ @Override
+ public void preDropTable(Table table) throws MetaException {
+ // nothing to do
+ }
+
+ @Override
+ public void rollbackCreateTable(Table table) throws MetaException {
+ // nothing to do
+ }
+
+ @Override
+ public void rollbackDropTable(Table table) throws MetaException {
+ // nothing to do
+
+ }
+
+ private String getCassandraKeyspaceName(Table tbl) {
+ String tableName = tbl.getParameters().get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ if (tableName == null) {
+ tableName = tbl.getSd().getSerdeInfo().getParameters().get(
+ CassandraSerDe.CASSANDRA_KEYSPACE_NAME);
+ }
+ if (tableName == null) {
+ tableName = tbl.getTableName();
+ }
+ return tableName;
+ }
+
+ private void ensureConnection(String host,int port, int timeout){
+ if (transport == null || (transport!=null && !transport.isOpen())){
+ transport = new TSocket(host, port,timeout);
+ proto = new TBinaryProtocol(transport);
+ try {
+ transport.open();
+ this.client = new Cassandra.Client(proto);
+ } catch (TTransportException e) {
+ throw new RuntimeException (e);
+ }
+ }
+ }
+
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0)
@@ -0,0 +1,462 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cassandra.output.CassandraPut;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+
+public class CassandraSerDe implements SerDe {
+
+ public static final String CASSANDRA_KEYSPACE_NAME="cassandra.ks.name";
+ public static final String CASSANDRA_CF_NAME="cassandra.cf.name";
+ public static final String CASSANDRA_RANGE_BATCH_SIZE="cassandra.range.size";
+ public static final String CASSANDRA_SLICE_PREDICATE_SIZE="cassandra.slice.predicate.size";
+ public static final String CASSANDRA_HOST="cassandra.host";
+ public static final String CASSANDRA_PORT="cassandra.port";
+ public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping";
+ public static final String CASSANDRA_KEY_COLUMN=":key";
+
+ private List cassandraColumnNames;
+ private ObjectInspector cachedObjectInspector;
+ private SerDeParameters serdeParams;
+ private LazyCassandraRow cachedCassandraRow;
+ private final ByteStream.Output serializeStream = new ByteStream.Output();
+ private boolean useJSONSerialize;
+ private int iKey;
+
+ private byte [] separators; // the separators array
+ private boolean escaped; // whether we need to escape the data when writing out
+ private byte escapeChar; // which char to use as the escape char, e.g. '\\'
+ private boolean [] needsEscape; // which chars need to be escaped. This array should have size
+ // of 128. Negative byte values (or byte values >= 128) are
+ // never escaped.
+
+ public static final Log LOG = LogFactory.getLog( CassandraSerDe.class.getName());
+
+ @Override
+ public String toString() {
+ return getClass().toString()
+ + "["
+ + cassandraColumnNames
+ + ":"
+ + ((StructTypeInfo) serdeParams.getRowTypeInfo())
+ .getAllStructFieldNames()
+ + ":"
+ + ((StructTypeInfo) serdeParams.getRowTypeInfo())
+ .getAllStructFieldTypeInfos() + "]";
+ }
+
+ public CassandraSerDe () throws SerDeException {
+
+ }
+
+ public void initialize(Configuration conf, Properties tbl)
+ throws SerDeException {
+
+ initCassandraSerDeParameters(conf, tbl, getClass().getName());
+
+ cachedObjectInspector = LazyFactory.createLazyStructInspector(
+ serdeParams.getColumnNames(),
+ serdeParams.getColumnTypes(),
+ serdeParams.getSeparators(),
+ serdeParams.getNullSequence(),
+ serdeParams.isLastColumnTakesRest(),
+ serdeParams.isEscaped(),
+ serdeParams.getEscapeChar());
+
+ cachedCassandraRow = new LazyCassandraRow(
+ (LazySimpleStructObjectInspector) cachedObjectInspector);
+
+ //cassandraColumnNames is an array how will it print out?
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CassandraSerDe initialized with : columnNames = "
+ + serdeParams.getColumnNames()
+ + " columnTypes = "
+ + serdeParams.getColumnTypes()
+ + " cassandraColumnMapping = "
+ + cassandraColumnNames);
+ }
+ }
+
+ private void initCassandraSerDeParameters(
+ Configuration job, Properties tbl, String serdeName)
+ throws SerDeException {
+ String cassandraColumnNameProperty =
+ tbl.getProperty(CassandraSerDe.CASSANDRA_COL_MAPPING);
+ String columnTypeProperty =
+ tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+
+ cassandraColumnNames = parseColumnMapping(cassandraColumnNameProperty);
+ iKey = cassandraColumnNames.indexOf(CassandraSerDe.CASSANDRA_KEY_COLUMN);
+
+ // Build the type property string if not supplied
+ if (columnTypeProperty == null) {
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < cassandraColumnNames.size(); i++) {
+ if (sb.length() > 0) {
+ sb.append(":");
+ }
+ String colName = cassandraColumnNames.get(i);
+ if (isSpecialColumn(colName)) {
+ // a special column becomes a STRING
+ sb.append(Constants.STRING_TYPE_NAME);
+ } else if (colName.endsWith(":")) {
+ // a column family become a MAP
+ sb.append(
+ Constants.MAP_TYPE_NAME + "<"
+ + Constants.STRING_TYPE_NAME
+ + "," + Constants.STRING_TYPE_NAME + ">");
+ } else {
+ // an individual column becomes a STRING
+ sb.append(Constants.STRING_TYPE_NAME);
+ }
+ }
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString());
+ }
+
+ serdeParams = LazySimpleSerDe.initSerdeParams(
+ job, tbl, serdeName);
+
+ if (cassandraColumnNames.size() != serdeParams.getColumnNames().size()) {
+ throw new SerDeException(serdeName + ": columns has " +
+ serdeParams.getColumnNames().size() +
+ " elements while hbase.columns.mapping has " +
+ cassandraColumnNames.size() + " elements" +
+ " (counting the key if implicit)");
+ }
+
+ // we just can make sure that "columnfamily:" is mapped to MAP
+ for (int i = 0; i < cassandraColumnNames.size(); i++) {
+ String hbaseColName = cassandraColumnNames.get(i);
+ if (hbaseColName.endsWith(":")) {
+ TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
+ if ((typeInfo.getCategory() != Category.MAP) ||
+ (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName()
+ != Constants.STRING_TYPE_NAME)) {
+
+ throw new SerDeException(
+ serdeName + ": Cassandra column family '"
+ + hbaseColName
+ + "' should be mapped to map but is mapped to "
+ + typeInfo.getTypeName());
+ }
+ }
+ }
+
+ }
+
+
+ @Override
+ public Object deserialize(Writable w) throws SerDeException {
+ if (!(w instanceof MapWritable)){
+ throw new SerDeException(getClass().getName() + ": expects MapWritable!");
+ }
+ MapWritable mw = (MapWritable)w;
+ this.cachedCassandraRow.init(mw, this.cassandraColumnNames);
+ return cachedCassandraRow;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return this.cachedObjectInspector;
+ }
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return CassandraPut.class;
+ }
+
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ System.out.println("we is serializin");
+ if (objInspector.getCategory() != Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List extends StructField> fields = soi.getAllStructFieldRefs();
+ List