Index: build.xml =================================================================== --- build.xml (revision 981263) +++ build.xml (working copy) @@ -53,11 +53,20 @@ + + + + + + + + + @@ -94,7 +103,7 @@ - + @@ -105,7 +114,7 @@ - + @@ -303,6 +312,7 @@ + Index: cassandra-handler/lib/clhm-production.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/clhm-production.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/high-scale-lib.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/high-scale-lib.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/slf4j-log4j12-1.5.8.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/libthrift-r917130.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/libthrift-r917130.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-cli-1.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-cli-1.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/avro-1.2.0-dev.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/avro-1.2.0-dev.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/apache-cassandra-0.6.3.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/apache-cassandra-0.6.3.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/antlr-3.1.3.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/antlr-3.1.3.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-codec-1.2.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-codec-1.2.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/google-collections-1.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/google-collections-1.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/jackson-mapper-asl-1.4.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-collections-3.2.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-collections-3.2.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/jackson-core-asl-1.4.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/jackson-core-asl-1.4.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/commons-lang-2.4.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/commons-lang-2.4.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/json-simple-1.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/json-simple-1.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/jline-0.9.94.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/jline-0.9.94.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/slf4j-api-1.5.8.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/slf4j-api-1.5.8.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/log4j-1.2.14.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/log4j-1.2.14.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/lib/ivy-2.1.0.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: cassandra-handler/lib/ivy-2.1.0.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: cassandra-handler/ivy.xml =================================================================== --- cassandra-handler/ivy.xml (revision 0) +++ cassandra-handler/ivy.xml (revision 0) @@ -0,0 +1,8 @@ + + + + + + + + Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java =================================================================== --- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) +++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) @@ -0,0 +1,82 @@ +package org.apache.cassandra.contrib.utils.service; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; + +/** + * A cleanup utility that wipes the cassandra data directories. + * + * @author Ran Tavory (rantav@gmail.com) + * + */ +public class CassandraServiceDataCleaner { + + /** + * Creates all data dir if they don't exist and cleans them + * @throws IOException + */ + public void prepare() throws IOException { + makeDirsIfNotExist(); + cleanupDataDirectories(); + } + + /** + * Deletes all data from cassandra data directories, including the commit log. + * @throws IOException in case of permissions error etc. + */ + public void cleanupDataDirectories() throws IOException { + for (String s: getDataDirs()) { + cleanDir(s); + } + } + /** + * Creates the data diurectories, if they didn't exist. + * @throws IOException if directories cannot be created (permissions etc). + */ + public void makeDirsIfNotExist() throws IOException { + for (String s: getDataDirs()) { + mkdir(s); + } + } + + /** + * Collects all data dirs and returns a set of String paths on the file system. + * + * @return + */ + private Set getDataDirs() { + Set dirs = new HashSet(); + for (String s : DatabaseDescriptor.getAllDataFileLocations()) { + dirs.add(s); + } + dirs.add(DatabaseDescriptor.getLogFileLocation()); + return dirs; + } + /** + * Creates a directory + * + * @param dir + * @throws IOException + */ + private void mkdir(String dir) throws IOException { + FileUtils.createDirectory(dir); + } + + /** + * Removes all directory content from file the system + * + * @param dir + * @throws IOException + */ + private void cleanDir(String dir) throws IOException { + File dirFile = new File(dir); + if (dirFile.exists() && dirFile.isDirectory()) { + FileUtils.delete(dirFile.listFiles()); + } + } +} \ No newline at end of file Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) @@ -0,0 +1,58 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; + +import junit.extensions.TestSetup; +import junit.framework.Test; + +import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.transport.TTransportException; + +public class CassandraTestSetup extends TestSetup{ + + static final Log LOG = LogFactory.getLog(CassandraTestSetup.class); + + private EmbeddedCassandraService cassandra; + + public CassandraTestSetup(Test test){ + super(test); + } + + @SuppressWarnings("deprecation") + void preTest(HiveConf conf) throws IOException, TTransportException{ + if (cassandra ==null){ + //System.setProperty("storage-config", System.getProperty("user.dir")+"/src/test/resources" ); + LOG.debug("storage config"+System.getProperty("cassandra.resource.dir")); + System.setProperty("storage-config", System.getProperty("cassandra.resource.dir") ); + + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + cassandra = new EmbeddedCassandraService(); + cassandra.init(); + Thread t = new Thread(cassandra); + t.setDaemon(true); + t.start(); + } + + String auxJars = conf.getAuxJars(); + auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://" + + new JobConf(conf, Cassandra.Client.class).getJar(); + auxJars += ",file://" + new JobConf(conf, CassandraSerDe.class).getJar(); + conf.setAuxJars(auxJars); + + } + + @Override + protected void tearDown() throws Exception { + //do we need this? + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + } + +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestCassandraPut.java (revision 0) @@ -0,0 +1,56 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.output.CassandraColumn; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; + +public class TestCassandraPut extends TestCase{ + + private JobConf jobConf; + private FileSystem fs; + private String tmpRoot; + private Path tmpRootPath; + + public void testCassandraPut() throws IOException{ + jobConf= new JobConf(TestCassandraPut.class); + fs = FileSystem.get(jobConf); + tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis(); + tmpRootPath = new Path(tmpRoot); + if (fs.exists(tmpRootPath)){ + fs.delete(tmpRootPath, true); + } + fs.mkdirs( new Path(tmpRoot) ); + Path sequenceFile = new Path(tmpRootPath,"afile"); + CassandraPut cp = new CassandraPut("mykey"); + CassandraColumn cc = new CassandraColumn(); + cc.setColumnFamily("Standard1"); + cc.setColumn("firstname".getBytes()); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setValue("edward".getBytes()); + cp.getColumns().add( cc ); + + SequenceFile.Writer writer = SequenceFile.createWriter + (fs, jobConf, sequenceFile, Text.class, CassandraPut.class , + SequenceFile.CompressionType.BLOCK ,new org.apache.hadoop.io.compress.DefaultCodec()); + + writer.append(new Text("edward"),cp ); + writer.close(); + + SequenceFile.Reader reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); + Text key = new Text(); + CassandraPut value= new CassandraPut(); + reader.next(key, value); + assertEquals(new Text("edward"),key); + + assertEquals( new String( cp.getColumns().get(0).getColumnFamily() ), + new String( "Standard1" ) ); + } +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/TestColumnFamilyInputFormat.java (revision 0) @@ -0,0 +1,128 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; +import java.util.GregorianCalendar; +import java.util.Map; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class TestColumnFamilyInputFormat extends TestCase { + + private JobConf jobConf; + private FileSystem fs; + private String tmpRoot; + private Path tmpRootPath; + + private static TreeMap> mrResults = new TreeMap>(); + + @SuppressWarnings("deprecation") + public void testIf() throws Exception { + + EmbeddedCassandraService cassandra; + System.setProperty("storage-config", "/home/edward/encrypt/cassandra-cleanup/trunk/src/test/resources/"); + + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + cassandra = new EmbeddedCassandraService(); + cassandra.init(); + Thread t = new Thread(cassandra); + t.setDaemon(true); + t.start(); + + GregorianCalendar gc = new GregorianCalendar(); + long timestamp = gc.getTimeInMillis(); + Cassandra.Client client = getClient(); + ColumnPath cp = new ColumnPath("Standard1"); + cp.setColumn("first".getBytes("utf-8")); + + client.insert("Keyspace1", "ecapriolo", cp, "Edward".getBytes("UTF-8"), + timestamp, ConsistencyLevel.ONE); + + client.insert("Keyspace1", "2", cp, "2".getBytes("UTF-8"), + timestamp, ConsistencyLevel.ONE); + + ColumnOrSuperColumn theColumn = client.get("Keyspace1", "ecapriolo", cp, ConsistencyLevel.ONE); + assertEquals("Edward", new String(theColumn.column.value,"UTF-8")); + + jobConf= new JobConf(TestColumnFamilyInputFormat.class); + jobConf.set(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, "Keyspace1"); + jobConf.set(CassandraSerDe.CASSANDRA_CF_NAME, "Standard1"); + jobConf.set(CassandraSerDe.CASSANDRA_HOST, "localhost"); + jobConf.set(CassandraSerDe.CASSANDRA_PORT,"9170"); + jobConf.set(CassandraSerDe.CASSANDRA_COL_MAPPING, "Standard1:astring" ); + + fs = FileSystem.get(jobConf); + tmpRoot = "/tmp/" + System.getProperty("user.name") + "/"+System.currentTimeMillis(); + tmpRootPath = new Path(tmpRoot); + if (fs.exists(tmpRootPath)){ + fs.delete(tmpRootPath, true); + } + fs.mkdirs( new Path(tmpRoot) ); + Path aFolder = new Path(tmpRootPath,"afolder"); + fs.mkdirs( aFolder ); + fs.mkdirs( new Path(tmpRootPath,"bfolder")); + + jobConf.setMapperClass(MapClass.class); + jobConf.setNumReduceTasks(0); + jobConf.setInputFormat(org.apache.hadoop.hive.cassandra.HiveCassandraTableInputFormat.class); + jobConf.setOutputFormat(org.apache.hadoop.mapred.lib.NullOutputFormat.class ); + JobClient.runJob(jobConf); + + assert(mrResults.containsKey("ecapriolo")); + assert(mrResults.containsKey("2")); + TreeMap aRow = new TreeMap(); + aRow.put("first", "Edward"); + assertEquals( aRow, mrResults.get("ecapriolo")); + } + + static class MapClass extends MapReduceBase implements Mapper{ + + @SuppressWarnings("unchecked") + @Override + public void map(Text key, MapWritable value, OutputCollector out, Reporter rep) + throws IOException { + for (Map.Entry entry: value.entrySet()){ + String eKey= new String( ((BytesWritable) entry.getKey()).getBytes() ); + String eVal = new String( ((BytesWritable) entry.getValue()).getBytes() ); + TreeMap row = new TreeMap(); + row.put(eKey,eVal); + mrResults.put(key.toString(), row); + } + } + + } + + + private Cassandra.Client getClient() throws TTransportException { + TTransport tr = new TSocket("localhost", 9170); + TProtocol proto = new TBinaryProtocol(tr); + Cassandra.Client client = new Cassandra.Client(proto); + tr.open(); + return client; + } + +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) @@ -0,0 +1,23 @@ +package org.apache.hadoop.hive.cassandra; + +import org.apache.hadoop.hive.ql.QTestUtil; + +/** + * HBaseQTestUtil initializes HBase-specific test fixtures. + */ +public class CassandraQTestUtil extends QTestUtil { + public CassandraQTestUtil( + String outDir, String logDir, boolean miniMr, CassandraTestSetup setup) + throws Exception { + + super(outDir, logDir, miniMr, null); + setup.preTest(conf); + super.init(); + } + + @Override + public void init() throws Exception { + // defer + } +} + Index: cassandra-handler/src/test/resources/storage-conf.xml =================================================================== --- cassandra-handler/src/test/resources/storage-conf.xml (revision 0) +++ cassandra-handler/src/test/resources/storage-conf.xml (revision 0) @@ -0,0 +1,380 @@ + + + + + + + + Test Cluster + + + false + + + + + + + + + + + + + org.apache.cassandra.locator.RackUnawareStrategy + + + 1 + + + org.apache.cassandra.locator.EndPointSnitch + + + + + org.apache.cassandra.auth.AllowAllAuthenticator + + + org.apache.cassandra.dht.RandomPartitioner + + + + + + /tmp/commitlog + + /tmp/data + + + + + + 127.0.0.1 + + + + + + + 10000 + + 128 + + + + + + localhost + + 7001 + + + 0.0.0.0 + + 9170 + + false + + + + + + + + auto + + + 512 + + + 64 + + + 32 + 8 + + + 64 + + + 64 + + 256 + + 0.3 + + 60 + + + 8 + 32 + + + periodic + + 10000 + + + + + 864000 + Index: cassandra-handler/src/test/resources/log4j-tools.properties =================================================================== --- cassandra-handler/src/test/resources/log4j-tools.properties (revision 0) +++ cassandra-handler/src/test/resources/log4j-tools.properties (revision 0) @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set the root to INFO +# and the pattern to %c instead of %l. (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=WARN,stderr + +# stderr +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n Index: cassandra-handler/src/test/resources/log4j.properties =================================================================== --- cassandra-handler/src/test/resources/log4j.properties (revision 0) +++ cassandra-handler/src/test/resources/log4j.properties (revision 0) @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set the root to INFO +# and the pattern to %c instead of %l. (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=INFO,stdout,R + +# stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n + +# rolling log file +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.file.maxFileSize=20MB +log4j.appender.file.maxBackupIndex=50 +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +# Edit the next line to point to your logs directory +log4j.appender.R.File=/tmp/system.log + +# Application logging options +#log4j.logger.com.facebook=DEBUG +#log4j.logger.com.facebook.infrastructure.gms=DEBUG +#log4j.logger.com.facebook.infrastructure.db=DEBUG Index: cassandra-handler/src/test/resources/passwd.properties =================================================================== --- cassandra-handler/src/test/resources/passwd.properties (revision 0) +++ cassandra-handler/src/test/resources/passwd.properties (revision 0) @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a sample password file for SimpleAuthenticator. The format of +# this file is username=password. If -Dpasswd.mode=MD5 then the password +# is represented as an md5 digest, otherwise it is cleartext (keep this +# in mind when setting file mode and ownership). +jsmith=havebadpass +Elvis\ Presley=graceland4evar +dilbert=nomoovertime Index: cassandra-handler/src/test/resources/access.properties =================================================================== --- cassandra-handler/src/test/resources/access.properties (revision 0) +++ cassandra-handler/src/test/resources/access.properties (revision 0) @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a sample access file for SimpleAuthenticator. The format of +# this file is keyspace=users, where users is a comma delimited list of +# authenticatable users from passwd.properties. This file contains +# potentially sensitive information, keep this in mind when setting its +# mode and ownership. +Keyspace1=jsmith,Elvis Presley,dilbert Index: cassandra-handler/src/test/queries/cassandra_udfs.q =================================================================== Index: cassandra-handler/src/test/queries/cassandra_queries.q =================================================================== --- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) +++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) @@ -0,0 +1,12 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , +"cassandra.cf.name" = "Standard1" , "cassandra.host" = "localhost" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" +) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1"); + +describe cassandra_keyspace1_standard1; + +EXPLAIN FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0; +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%2)=0; Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm =================================================================== --- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) +++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) @@ -0,0 +1,125 @@ +package org.apache.hadoop.hive.cli; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.hive.cassandra.CassandraQTestUtil; +import org.apache.hadoop.hive.cassandra.CassandraTestSetup; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.antlr.runtime.*; +import org.antlr.runtime.tree.*; + +public class $className extends TestCase { + + private CassandraQTestUtil qt; + private CassandraTestSetup setup; + + public $className(String name, CassandraTestSetup setup) { + super(name); + qt = null; + this.setup = setup; + } + + @Override + protected void setUp() { + try { + boolean miniMR = false; + if ("$clusterMode".equals("miniMR")) { + miniMR = true; + } + + qt = new CassandraQTestUtil( + "$resultsDir.getCanonicalPath()", + "$logDir.getCanonicalPath()", miniMR, setup); + +#foreach ($qf in $qfiles) + qt.addFile("$qf.getCanonicalPath()"); +#end + } catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in setup"); + } + } + + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + public static Test suite() { + TestSuite suite = new TestSuite(); + CassandraTestSetup setup = new CassandraTestSetup(suite); +#foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + suite.addTest(new $className("testCliDriver_$tname", setup)); +#end + return setup; + } + + #foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + public void testCliDriver_$tname() throws Exception { + try { + System.out.println("Begin query: " + "$fname"); + qt.cliInit("$fname"); + int ecode = qt.executeClient("$fname"); + if (ecode != 0) { + fail("Client Execution failed with error code = " + ecode); + } + if (SessionState.get() != null) { + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + + if (jobInfoMap.size() != 0) { + String cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) { + fail("Wrong return code in hive history"); + } + } + } + + ecode = qt.checkCliDriverResults("$fname"); + if (ecode != 0) { + fail("Client execution results failed with error code = " + ecode); + } + } catch (Throwable e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception"); + } + + System.out.println("Done query: " + "$fname"); + assertTrue("Test passed", true); + } + +#end +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) @@ -0,0 +1,122 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.CassandraSerDe; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + + + +public class HiveCassandraOutputFormat + implements HiveOutputFormat, + OutputFormat{ + + + + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + + + final String cassandraKeySpace = jc.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + final String cassandraColumnFamily = jc.get(CassandraSerDe.CASSANDRA_CF_NAME); + final String cassandraHost = jc.get(CassandraSerDe.CASSANDRA_HOST); + final int cassandraPort = Integer.parseInt ( jc.get(CassandraSerDe.CASSANDRA_PORT) ); + + return new RecordWriter() { + + private Cassandra.Client client; + private TTransport transport; + private TProtocol proto; + @Override + public void close(boolean abort) throws IOException { + //no going back + } + + @Override + public void write(Writable w) throws IOException { + CassandraPut put = (CassandraPut) w; + ensureConnection(cassandraHost,cassandraPort,5000); + for (CassandraColumn c : put.getColumns() ){ + ColumnPath cp = new ColumnPath(); + cp.setColumn(c.getColumn() ); + cp.setColumn_family(c.getColumnFamily() ); + + try { + client.insert(cassandraKeySpace, put.getKey(), cp, c.getValue(), c.getTimeStamp(), ConsistencyLevel.QUORUM); + } catch (InvalidRequestException e) { + throw new IOException(e); + } catch (UnavailableException e) { + throw new IOException(e); + } catch (TimedOutException e) { + throw new IOException(e); + } catch (TException e) { + throw new IOException(e); + } + } + } + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + }; + } + + @Override + public void checkOutputSpecs(FileSystem arg0, JobConf jc) throws IOException { + /* + String hbaseTableName = jc.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); + Job job = new Job(jc); + JobContext jobContext = + new JobContext(job.getConfiguration(), job.getJobID()); + + try { + checkOutputSpecs(jobContext); + } catch (InterruptedException e) { + throw new IOException(e); + } +*/ + } + + @Override + public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem arg0, + JobConf arg1, String arg2, Progressable arg3) throws IOException { + throw new RuntimeException("Error: Hive should not invoke this method."); + + } + + + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) @@ -0,0 +1,87 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; + +public class CassandraPut implements Writable{ + + private String key; + //private ConsistencyLevel level; + private List columns; + + public CassandraPut(){ + columns = new ArrayList(); + } + + public CassandraPut(String key){ + this(); + setKey(key); + } + + @Override + public void readFields(DataInput in) throws IOException { + key = in.readUTF(); + int ilevel = in.readInt(); +// level = ConsistencyLevel.QUORUM; + int cols = in.readInt(); + for (int i =0;i getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("key: "); + sb.append(this.key); + for (CassandraColumn col:this.columns){ + sb.append( "column : [" ); + sb.append( col.toString() ); + sb.append( "]" ); + } + return sb.toString(); + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) @@ -0,0 +1,79 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class CassandraColumn implements Writable{ + + private String columnFamily; + private long timeStamp; + private byte [] column; + private byte [] value; + + + @Override + public void readFields(DataInput din) throws IOException { + columnFamily = din.readUTF(); + timeStamp = din.readLong(); + int clength= din.readInt(); + column = new byte[clength]; + din.readFully(column, 0, clength); + int vlength = din.readInt(); + value = new byte [vlength ]; + din.readFully( value, 0 , vlength); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(columnFamily); + out.writeLong(timeStamp); + out.writeInt( column.length ); + out.write(column); + out.writeInt( value.length ); + out.write(value); + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public byte[] getColumn() { + return column; + } + + public void setColumn(byte[] column) { + this.column = column; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append( "cf:"+ this.columnFamily); + sb.append( "column:"+ new String (this.column)); + sb.append( "value:"+ new String (this.value)); + return sb.toString(); + } +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) @@ -0,0 +1,166 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.Map; +import java.util.Properties; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook{ + + private TTransport transport; + private TProtocol proto; + private Cassandra.Client client; + + private Configuration configuration; + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) { + + Properties tableProperties = tableDesc.getProperties(); + String keyspaceName = + tableProperties.getProperty(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + if (keyspaceName == null) { + keyspaceName = tableProperties.getProperty(Constants.META_TABLE_NAME); + } + jobProperties.put(CassandraSerDe.CASSANDRA_KEYSPACE_NAME, keyspaceName); + jobProperties.put( CassandraSerDe.CASSANDRA_COL_MAPPING, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_COL_MAPPING)); + jobProperties.put( CassandraSerDe.CASSANDRA_CF_NAME, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_CF_NAME)); + jobProperties.put( CassandraSerDe.CASSANDRA_HOST, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_HOST)); + jobProperties.put( CassandraSerDe.CASSANDRA_PORT, + tableProperties.getProperty(CassandraSerDe.CASSANDRA_PORT)); + } + + + @Override + public Class getInputFormatClass() { + return HiveCassandraTableInputFormat.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public Class getOutputFormatClass() { + return org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return CassandraSerDe.class; + } + + + @Override + public Configuration getConf() { + return this.configuration; + } + + @Override + public void setConf(Configuration arg0) { + this.configuration=arg0; + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + //No work needed + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + //No work needed + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (!isExternal) { + throw new MetaException("Cassandra tables must be external."); + } + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for Cassandra."); + } + + String keyspaceName = this.getCassandraKeyspaceName(tbl); + Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); + String cassandraColumnStr = serdeParam.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + if (cassandraColumnStr == null) { + throw new MetaException("No cassandra.columns.mapping defined in Serde."); + } + String cassandraHost = serdeParam.get(CassandraSerDe.CASSANDRA_HOST); + int cassandraPort = Integer.parseInt(serdeParam.get( CassandraSerDe.CASSANDRA_PORT) ); + this.ensureConnection(cassandraHost, cassandraPort, 5000); + + try { + if ( ! client.describe_keyspaces().contains(keyspaceName) ){ + throw new MetaException( keyspaceName +" not found"); + } + } catch (TException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void preDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // nothing to do + + } + + private String getCassandraKeyspaceName(Table tbl) { + String tableName = tbl.getParameters().get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + if (tableName == null) { + tableName = tbl.getSd().getSerdeInfo().getParameters().get( + CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + } + if (tableName == null) { + tableName = tbl.getTableName(); + } + return tableName; + } + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSerDe.java (revision 0) @@ -0,0 +1,462 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; + +public class CassandraSerDe implements SerDe { + + public static final String CASSANDRA_KEYSPACE_NAME="cassandra.ks.name"; + public static final String CASSANDRA_CF_NAME="cassandra.cf.name"; + public static final String CASSANDRA_RANGE_BATCH_SIZE="cassandra.range.size"; + public static final String CASSANDRA_SLICE_PREDICATE_SIZE="cassandra.slice.predicate.size"; + public static final String CASSANDRA_HOST="cassandra.host"; + public static final String CASSANDRA_PORT="cassandra.port"; + public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; + public static final String CASSANDRA_KEY_COLUMN=":key"; + + private List cassandraColumnNames; + private ObjectInspector cachedObjectInspector; + private SerDeParameters serdeParams; + private LazyCassandraRow cachedCassandraRow; + private final ByteStream.Output serializeStream = new ByteStream.Output(); + private boolean useJSONSerialize; + private int iKey; + + private byte [] separators; // the separators array + private boolean escaped; // whether we need to escape the data when writing out + private byte escapeChar; // which char to use as the escape char, e.g. '\\' + private boolean [] needsEscape; // which chars need to be escaped. This array should have size + // of 128. Negative byte values (or byte values >= 128) are + // never escaped. + + public static final Log LOG = LogFactory.getLog( CassandraSerDe.class.getName()); + + @Override + public String toString() { + return getClass().toString() + + "[" + + cassandraColumnNames + + ":" + + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames() + + ":" + + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldTypeInfos() + "]"; + } + + public CassandraSerDe () throws SerDeException { + + } + + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + + initCassandraSerDeParameters(conf, tbl, getClass().getName()); + + cachedObjectInspector = LazyFactory.createLazyStructInspector( + serdeParams.getColumnNames(), + serdeParams.getColumnTypes(), + serdeParams.getSeparators(), + serdeParams.getNullSequence(), + serdeParams.isLastColumnTakesRest(), + serdeParams.isEscaped(), + serdeParams.getEscapeChar()); + + cachedCassandraRow = new LazyCassandraRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + //cassandraColumnNames is an array how will it print out? + if (LOG.isDebugEnabled()) { + LOG.debug("CassandraSerDe initialized with : columnNames = " + + serdeParams.getColumnNames() + + " columnTypes = " + + serdeParams.getColumnTypes() + + " cassandraColumnMapping = " + + cassandraColumnNames); + } + } + + private void initCassandraSerDeParameters( + Configuration job, Properties tbl, String serdeName) + throws SerDeException { + String cassandraColumnNameProperty = + tbl.getProperty(CassandraSerDe.CASSANDRA_COL_MAPPING); + String columnTypeProperty = + tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + cassandraColumnNames = parseColumnMapping(cassandraColumnNameProperty); + iKey = cassandraColumnNames.indexOf(CassandraSerDe.CASSANDRA_KEY_COLUMN); + + // Build the type property string if not supplied + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < cassandraColumnNames.size(); i++) { + if (sb.length() > 0) { + sb.append(":"); + } + String colName = cassandraColumnNames.get(i); + if (isSpecialColumn(colName)) { + // a special column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } else if (colName.endsWith(":")) { + // a column family become a MAP + sb.append( + Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + + "," + Constants.STRING_TYPE_NAME + ">"); + } else { + // an individual column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + + serdeParams = LazySimpleSerDe.initSerdeParams( + job, tbl, serdeName); + + if (cassandraColumnNames.size() != serdeParams.getColumnNames().size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.getColumnNames().size() + + " elements while hbase.columns.mapping has " + + cassandraColumnNames.size() + " elements" + + " (counting the key if implicit)"); + } + + // we just can make sure that "columnfamily:" is mapped to MAP + for (int i = 0; i < cassandraColumnNames.size(); i++) { + String hbaseColName = cassandraColumnNames.get(i); + if (hbaseColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.getColumnTypes().get(i); + if ((typeInfo.getCategory() != Category.MAP) || + (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName() + != Constants.STRING_TYPE_NAME)) { + + throw new SerDeException( + serdeName + ": Cassandra column family '" + + hbaseColName + + "' should be mapped to map but is mapped to " + + typeInfo.getTypeName()); + } + } + } + + } + + + @Override + public Object deserialize(Writable w) throws SerDeException { + if (!(w instanceof MapWritable)){ + throw new SerDeException(getClass().getName() + ": expects MapWritable!"); + } + MapWritable mw = (MapWritable)w; + this.cachedCassandraRow.init(mw, this.cassandraColumnNames); + return cachedCassandraRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return this.cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return CassandraPut.class; + } + + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + System.out.println("we is serializin"); + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames().size() > 0) ? + ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs() + : null; + + System.out.println("if i can mkae it here"); + CassandraPut put = null; + + try { + String key = serializeField(iKey, null, fields, list, declaredFields); + + if (key == null) { + throw new SerDeException("Cassandra row key cannot be NULL"); + } + + put = new CassandraPut(key); + + System.out.println("fields size" + fields.size()); + // Serialize each field + for (int i = 0; i < fields.size(); i++) { + if (i == iKey) { + // already processed the key above + continue; + } + serializeField(i, put, fields, list, declaredFields); + } + } catch (IOException e) { + throw new SerDeException(e); + } + + System.out.println("returning put"); + System.out.println(put); + return put; + + } + + + private String serializeField( + int i, + CassandraPut put, + List fields, + List list, + List declaredFields) throws IOException { + + // column name + String cassandraColumn = cassandraColumnNames.get(i); + + System.out.println("cassandra column "+cassandraColumn); + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + + if (f == null) { + System.out.println("returning null from serialize field "); + return null; + } + + // If the field corresponds to a column family in cassandra + if (cassandraColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector)foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(f); + if (map == null) { + System.out.println("returning null map from serializeField "); + return null; + } else { + for (Map.Entry entry: map.entrySet()) { + // Get the Key + serializeStream.reset(); + serialize(entry.getKey(), koi, 3); + System.out.println("mapKey "+entry.getKey()); + + // Get the column-qualifier + byte [] columnQualifier = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, serializeStream.getCount()); + System.out.println("mapColumn "+columnQualifier); + + // Get the Value + serializeStream.reset(); + + System.out.println("mapValue "+entry.getValue()); + boolean isNotNull = serialize(entry.getValue(), voi, 3); + if (!isNotNull) { + continue; + } + byte [] value = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount()); + //hardcode here must fix + // put.add("Standard1".getBytes(), columnQualifier, value); + //put.add(family, column, value) + } + } + } else { + + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + serializeStream.reset(); + boolean isNotNull; + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + + isNotNull = serialize( + SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + 1); + } else { + isNotNull = serialize(f, foi, 1); + } + if (!isNotNull) { + return null; + } + byte [] key = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i == iKey) { + return new String(key); + } + } + + return null; + + } + + + private boolean serialize(Object obj, ObjectInspector objInspector, int level) + throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8( + serializeStream, obj, + (PrimitiveObjectInspector) objInspector, + escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector)objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), eoi, level + 1); + } + } + return true; + } + case MAP: { + char separator = (char) separators[level]; + char keyValueSeparator = (char) separators[level+1]; + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry: map.entrySet()) { + if (first) { + first = false; + } else { + serializeStream.write(separator); + } + serialize(entry.getKey(), koi, level+2); + serializeStream.write(keyValueSeparator); + serialize(entry.getValue(), voi, level+2); + } + } + return true; + } + case STRUCT: { + char separator = (char)separators[level]; + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1); + } + } + return true; + } + } + throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); + } + + + + + public static List parseColumnMapping(String columnMapping) { + String [] columnArray = columnMapping.split(","); + List columnList = Arrays.asList(columnArray); + int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); + if (iKey == -1) { + columnList = new ArrayList(columnList); + columnList.add(0, CASSANDRA_KEY_COLUMN); + } + return columnList; + } + + public static List initColumnNamesBytes(List columnNames) { + List columnBytes = new ArrayList(); + String column = null; + + for (int i = 0; i < columnNames.size(); i++) { + column = columnNames.get(i); + + try { + if (column.endsWith(":")) { + columnBytes.add((column.split(":")[0]).getBytes("UTF-8")); + } else { + columnBytes.add((column).getBytes("UTF-8")); + } + } catch (UnsupportedEncodingException ex){ + throw new RuntimeException(ex); + } + } + + return columnBytes; + } + + public static boolean isSpecialColumn(String columnName) { + return columnName.equals(CASSANDRA_KEY_COLUMN); + } + + + + } Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraRowResult.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraRowResult.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraRowResult.java (revision 0) @@ -0,0 +1,93 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.SortedMap; + +import org.apache.cassandra.db.IColumn; +import org.apache.hadoop.io.Writable; + + + +public class CassandraRowResult implements Writable { + + private String key; + private SortedMap values; + + public CassandraRowResult(String key, SortedMap values) { + setKey(key); + setValues(values); + } + + @Override + public void readFields(DataInput din) throws IOException { + setKey(din.readUTF()); + int colsToRead = din.readInt(); + for (int i = 0; i < colsToRead; i++) { + int keyByteLength = din.readInt(); + byte b[] = new byte[keyByteLength]; + din.readFully(b); + IColumn ic = readIColumn(din); + this.getValues().put(b, ic); + } + } + + public IColumn readIColumn(DataInput din) throws IOException { + /* + HiveIColumn hic = new HiveIColumn(); + hic.setName(din.readUTF().getBytes("UTF-8")); + hic.setValue(din.readUTF().getBytes("UTF-8")); + int icolumnsToRead = din.readInt(); + for (int i = 0; i < icolumnsToRead; i++) { + hic.getChildren().add(this.readIColumn(din)); + } + return hic; + */ + return null; + } + + @Override + public void write(DataOutput dout) throws IOException { + dout.writeUTF(key); + dout.writeInt(values.size()); + for (Map.Entry entry : values.entrySet()) { + dout.write(entry.getKey().length); + dout.write(entry.getKey()); + IColumn value = entry.getValue(); + writeIColumn(value, dout); + } + } + + public void writeIColumn(IColumn ic, DataOutput dout) throws IOException { + dout.write(ic.name()); + dout.write(ic.value()); + dout.writeLong(ic.timestamp()); + Collection cols = ic.getSubColumns(); + dout.writeInt(cols.size()); + if (cols.size() > 0) { + for (IColumn sub : ic.getSubColumns()) { + writeIColumn(sub, dout); + } + } + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public SortedMap getValues() { + return values; + } + + public void setValues(SortedMap values) { + this.values = values; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/LazyCassandraRow.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/LazyCassandraRow.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/LazyCassandraRow.java (revision 0) @@ -0,0 +1,101 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; + +public class LazyCassandraRow extends LazyStruct{ + private List cassandraColumns; + private MapWritable rowResult; + private ArrayList cachedList; + + public LazyCassandraRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + public void init(MapWritable rr, List cassandraColumns) { + this.rowResult = rr; + this.cassandraColumns = cassandraColumns; + setParsed(false); + } + + private void parse(){ + if (getFields() == null) { + List fieldRefs = + ((StructObjectInspector)getInspector()).getAllStructFieldRefs(); + setFields(new LazyObject[fieldRefs.size()]); + for (int i = 0; i < getFields().length; i++) { + String cassandraColumn = this.cassandraColumns.get(i); + if (cassandraColumn.endsWith(":")) { + // a column family + getFields()[i] = new LazyCassandraCellMap( (LazyMapObjectInspector) + fieldRefs.get(i).getFieldObjectInspector()); + continue; + } + + getFields()[i] = LazyFactory.createLazyObject( + fieldRefs.get(i).getFieldObjectInspector()); + } + setFieldInited(new boolean[getFields().length]); + } + Arrays.fill(getFieldInited(), false); + setParsed(true); + } + + @Override + public Object getField(int fieldID) { + if (!getParsed()) { + parse(); + } + return uncheckedGetField(fieldID); + } + + private Object uncheckedGetField(int fieldID) { + if (!getFieldInited()[fieldID]) { + getFieldInited()[fieldID] = true; + + ByteArrayRef ref = null; + + String columnName = cassandraColumns.get(fieldID); + if (columnName.equals(CassandraSerDe.CASSANDRA_KEY_COLUMN)) { + ref = new ByteArrayRef(); + //ref.setData(rowResult.getKey().getBytes()); + ref.setData( "CassandraKeyHere".getBytes() ); + } else { + if (columnName.endsWith(":")) { + // it is a column family + //((LazyCassandraCellMap) getFields()[fieldID]).init( + // rowResult, columnName); + return null; + } else { + // it is a column + // if (rowResult.containsKey(columnName)) { + if ( this.rowResult.containsKey(columnName.getBytes()) ){ + ref = new ByteArrayRef(); + BytesWritable b =(BytesWritable) this.rowResult.get(columnName.getBytes()); + ref.setData(b.getBytes()); + } else { + return null; + } + } + } + if (ref != null) { + getFields()[fieldID].init(ref, 0, ref.getData().length); + } + } + return getFields()[fieldID].getObject(); + } + + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSplit.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSplit.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraSplit.java (revision 0) @@ -0,0 +1,70 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +/* This class wraps the cassandra ColumnFamilySplit and adds the columnMapping + * That hive will use to 'tablize' the data inside it. + * */ +@SuppressWarnings("deprecation") +public class CassandraSplit extends FileSplit implements InputSplit{ + + private final ColumnFamilySplit split; + private String columnMapping; + + public CassandraSplit() { + super((Path) null, 0, 0, (String[]) null); + columnMapping = ""; + split = new ColumnFamilySplit(null,null,null); + } + + public CassandraSplit(ColumnFamilySplit split, String columnsMapping, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.split = split; + columnMapping = columnsMapping; + } + + /* bean patterns */ + public String getColumnMapping(){ + return columnMapping; + } + + public ColumnFamilySplit getSplit(){ + return this.split; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + columnMapping = in.readUTF(); + split.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(columnMapping); + split.write(out); + } + + @Override + public long getLength() { + return split.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } + + @Override + public String toString() { + return "ColumnFamilySplit " + split + " : " + columnMapping; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/HiveCassandraTableInputFormat.java (revision 0) @@ -0,0 +1,169 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +public class HiveCassandraTableInputFormat extends +ColumnFamilyInputFormat implements InputFormat { + static final Log LOG = LogFactory.getLog(HiveCassandraTableInputFormat.class); + + public HiveCassandraTableInputFormat() { + super(); + } + + @Override + public org.apache.hadoop.mapreduce.RecordReader> createRecordReader( + org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException{ + org.apache.hadoop.mapreduce.RecordReader> result = null; + try { + result = super.createRecordReader(inputSplit, taskAttemptContext); + result.initialize(((CassandraSplit)inputSplit).getSplit(), taskAttemptContext); + } catch (InterruptedException e) { + throw new IOException(e); + } + return result; + } + + public RecordReader getRecordReader + (InputSplit split,JobConf jobConf,final Reporter reporter) throws IOException { + CassandraSplit cassandraSplit = (CassandraSplit) split; + String cassandraTableName = jobConf.get(CassandraSerDe.CASSANDRA_CF_NAME); + String cassandraColumnsMapping = jobConf.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + List columns = CassandraSerDe.parseColumnMapping(cassandraColumnsMapping); + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + + if (columns.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + + org.apache.cassandra.hadoop.ColumnFamilySplit cfSplit = cassandraSplit.getSplit(); + + Job job= new Job(jobConf); + TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) { + @Override + public void progress() { + reporter.progress(); + } + }; + + final org.apache.hadoop.mapreduce.RecordReader> + recordReader = createRecordReader(cassandraSplit, tac); + + return new RecordReader(){ + + public void close() throws IOException{ + recordReader.close(); + } + public Text createKey(){ + return new Text(); + } + public MapWritable createValue(){ + //return new TreeMap(); + return new MapWritable(); + } + public long getPos() throws IOException { + return 0l; + } + public float getProgress() throws IOException { + float progress = 0.0F; + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return progress; + } + public boolean next(Text rowKey,MapWritable value) throws IOException{ + boolean next = false; + try { + next = recordReader.nextKeyValue(); + if (next){ + rowKey.set( recordReader.getCurrentKey() ); + for (Map.Entry entry : recordReader.getCurrentValue().entrySet()){ + value.put( + new BytesWritable(entry.getKey()), + new BytesWritable(entry.getValue().value()) ); + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return next; + } + }; + + } + + public InputSplit[] getSplits( JobConf jobConf, int numSplits ) throws IOException { + + String ks = jobConf.get(CassandraSerDe.CASSANDRA_KEYSPACE_NAME); + String cf = jobConf.get(CassandraSerDe.CASSANDRA_CF_NAME); + int slicePredicateSize = jobConf.getInt(CassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, 1000); + int sliceRangeSize = jobConf.getInt(CassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, 100); + + SlicePredicate predicate = new SlicePredicate(); + predicate.setSlice_range( new SliceRange(new byte[0], new byte[0], false, slicePredicateSize) ); + ConfigHelper.setSlicePredicate(jobConf, predicate); + ConfigHelper.setColumnFamily(jobConf, ks, cf); + ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize); + + //column mapping + String cassandraColumnMapping = jobConf.get(CassandraSerDe.CASSANDRA_COL_MAPPING); + if (cassandraColumnMapping == null){ + throw new IOException("cassandra.columns.mapping required for Cassandra Table."); + } + /* + List columns = CassandraSerDe.parseColumnMapping(cassandraColumnMapping); + List inputColumns = new ArrayList(); + for (String column : columns) { + if (CassandraSerDe.isSpecialColumn(column)) { + continue; + } + inputColumns.add(column.getBytes("UTF-8")); + } + */ + Job job = new Job(jobConf); + JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID()); + + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, new Path("/")); + FileInputFormat.addInputPath(job, new Path("/")); + //Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); + Path [] tablesPaths = new Path [] { new Path("/")}; + List splits = getSplits(jobContext); + InputSplit [] results = new InputSplit[splits.size()]; + + for (int i = 0; i < splits.size(); i++) { + results[i] = new CassandraSplit((ColumnFamilySplit) splits.get(i), + cassandraColumnMapping, tablesPaths[0]); + } + return results; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/LazyCassandraCellMap.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/LazyCassandraCellMap.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/LazyCassandraCellMap.java (revision 0) @@ -0,0 +1,131 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +public class LazyCassandraCellMap extends LazyMap{ + + private CassandraRowResult rowResult; + private String cassandraColumnFamily; + + protected LazyCassandraCellMap(LazyMapObjectInspector oi) { + super(oi); + // TODO Auto-generated constructor stub + } + + public void init(CassandraRowResult rr, String columnFamily) { + rowResult = rr; + cassandraColumnFamily = columnFamily; + setParsed(false); + } + + private void parse() { + if (cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + + //Iterator iter = rowResult.keySet().iterator(); + Iterator iter = rowResult.getValues().keySet().iterator(); + + byte[] columnFamily = cassandraColumnFamily.getBytes(); + while (iter.hasNext()) { + byte [] columnKey = iter.next(); + if (columnFamily.length > columnKey.length) { + continue; + } + + if (0 == LazyUtils.compare( + columnFamily, 0, columnFamily.length, + columnKey, 0, columnFamily.length)) { + + //byte [] columnValue = rowResult.get(columnKey).getValue(); + byte [] columnValue = rowResult.getValues().get(columnKey).value(); + + if (columnValue == null || columnValue.length == 0) { + // an empty object + continue; + } + + // Keys are always primitive + LazyPrimitive key = LazyFactory.createLazyPrimitiveClass( + (PrimitiveObjectInspector) + ((MapObjectInspector) getInspector()).getMapKeyObjectInspector()); + ByteArrayRef keyRef = new ByteArrayRef(); + keyRef.setData(columnKey); + key.init( + keyRef, columnFamily.length, columnKey.length - columnFamily.length); + + // Value + LazyObject value = LazyFactory.createLazyObject( + ((MapObjectInspector) getInspector()).getMapValueObjectInspector()); + ByteArrayRef valueRef = new ByteArrayRef(); + valueRef.setData(columnValue); + value.init(valueRef, 0, columnValue.length); + + // Put it into the map + cachedMap.put(key.getObject(), value.getObject()); + } + } + } + + /** + * Get the value in the map for the given key. + * + * @param key + * @return + */ + + @Override + public Object getMapValueElement(Object key) { + if (!getParsed()) { + parse(); + } + + for (Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive) entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive + // writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI == null) { + continue; + } + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject) entry.getValue(); + return v == null ? v : v.getObject(); + } + } + + return null; + } + + @Override + public Map getMap() { + if (!getParsed()) { + parse(); + } + return cachedMap; + } + + @Override + public int getMapSize() { + if (!getParsed()) { + parse(); + } + return cachedMap.size(); + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/SetCassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/SetCassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/SetCassandraColumn.java (revision 0) @@ -0,0 +1,75 @@ +package org.apache.hadoop.hive.cassandra.udf; + +import java.io.UnsupportedEncodingException; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class SetCassandraColumn extends org.apache.hadoop.hive.ql.exec.UDF { + + private Cassandra.Client client; + private TTransport transport; + private TProtocol proto; + + public int evaluate(String host, int port, int timeout, String keyspace, + String columnFamily, String column, String key, String value){ + + ensureConnection(host,port,timeout); + ColumnPath cp = new ColumnPath(); + + try { + cp.setColumn(column.getBytes("UTF-8") ); + } catch (UnsupportedEncodingException e1) { + throw new RuntimeException (e1); + } + cp.setColumn_family(columnFamily); + + try { + this.client.insert(keyspace, key, cp, value.getBytes("UTF-8"), System.currentTimeMillis(), ConsistencyLevel.QUORUM); + } catch (InvalidRequestException e) { + throw new RuntimeException (e); + } catch (UnavailableException e) { + throw new RuntimeException (e); + } catch (TimedOutException e) { + throw new RuntimeException (e); + } catch (TException e) { + throw new RuntimeException (e); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException (e); + } + return 0; + } + + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + + @Override + public void finalize(){ + if (transport != null){ + if (transport.isOpen()){ + transport.close(); + } + } + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/GetCassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/GetCassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/udf/GetCassandraColumn.java (revision 0) @@ -0,0 +1,76 @@ +package org.apache.hadoop.hive.cassandra.udf; + +import java.io.UnsupportedEncodingException; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class GetCassandraColumn extends UDF { + private Cassandra.Client client; + private TTransport transport; + private TProtocol proto; + + public String evaluate(String host, int port, int timeout, String keyspace, + String columnFamily, String column, String key){ + + ensureConnection(host,port,timeout); + ColumnPath cp = new ColumnPath(); + + try { + cp.setColumn(column.getBytes("UTF-8") ); + } catch (UnsupportedEncodingException e1) { + throw new RuntimeException (e1); + } + cp.setColumn_family(columnFamily); + ColumnOrSuperColumn result = null; + try { + result = this.client.get(keyspace, key ,cp, ConsistencyLevel.QUORUM); + } catch (InvalidRequestException e) { + throw new RuntimeException (e); + } catch (NotFoundException e) { + throw new RuntimeException (e); + } catch (UnavailableException e) { + throw new RuntimeException (e); + } catch (TimedOutException e) { + throw new RuntimeException (e); + } catch (TException e) { + throw new RuntimeException (e); + } + return new String(result.column.value); + } + + private void ensureConnection(String host,int port, int timeout){ + if (transport == null || (transport!=null && !transport.isOpen())){ + transport = new TSocket(host, port,timeout); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException (e); + } + } + } + + @Override + public void finalize(){ + if (transport != null){ + if (transport.isOpen()){ + transport.close(); + } + } + } +} Index: cassandra-handler/build.xml =================================================================== --- cassandra-handler/build.xml (revision 0) +++ cassandra-handler/build.xml (revision 0) @@ -0,0 +1,155 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Tests failed! + + + + + + Index: build-common.xml =================================================================== --- build-common.xml (revision 981263) +++ build-common.xml (working copy) @@ -429,6 +429,7 @@ +