Index: build.xml =================================================================== --- build.xml (revision 1078995) +++ build.xml (working copy) @@ -96,7 +96,7 @@ - + @@ -118,7 +118,7 @@ - + @@ -129,7 +129,7 @@ - + @@ -362,6 +362,7 @@ + Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1078995) +++ ivy/libraries.properties (working copy) @@ -26,6 +26,7 @@ datanucleus-core.version=2.0.3 datanucleus-enhancer.version=2.0.3 datanucleus-rdbms.version=2.0.3 +cassandra.version=0.7.2 checkstyle.version=5.0 commons-cli.version=1.2 commons-codec.version=1.3 Index: cassandra-handler/conf/cassandra.yaml =================================================================== --- cassandra-handler/conf/cassandra.yaml (revision 0) +++ cassandra-handler/conf/cassandra.yaml (revision 0) @@ -0,0 +1,456 @@ +# Cassandra storage config YAML + +# NOTE: +# See http://wiki.apache.org/cassandra/StorageConfiguration for +# full explanations of configuration directives +# /NOTE + +# The name of the cluster. This is mainly used to prevent machines in +# one logical cluster from joining another. +cluster_name: 'Test Cluster' + +# You should always specify InitialToken when setting up a production +# cluster for the first time, and often when adding capacity later. +# The principle is that each node should be given an equal slice of +# the token ring; see http://wiki.apache.org/cassandra/Operations +# for more details. +# +# If blank, Cassandra will request a token bisecting the range of +# the heaviest-loaded existing node. If there is no load information +# available, such as is the case with a new cluster, it will pick +# a random token, which will lead to hot spots. +initial_token: + +# Set to true to make new [non-seed] nodes automatically migrate data +# to themselves from the pre-existing nodes in the cluster. Defaults +# to false because you can only bootstrap N machines at a time from +# an existing cluster of N, so if you are bringing up a cluster of +# 10 machines with 3 seeds you would have to do it in stages. Leaving +# this off for the initial start simplifies that. +auto_bootstrap: false + +# See http://wiki.apache.org/cassandra/HintedHandoff +hinted_handoff_enabled: true + +# authentication backend, implementing IAuthenticator; used to identify users +authenticator: org.apache.cassandra.auth.AllowAllAuthenticator + +# authorization backend, implementing IAuthority; used to limit access/provide permissions +authority: org.apache.cassandra.auth.AllowAllAuthority + +# any IPartitioner may be used, including your own as long as it is on +# the classpath. Out of the box, Cassandra provides +# org.apache.cassandra.dht.RandomPartitioner +# org.apache.cassandra.dht.ByteOrderedPartitioner, +# org.apache.cassandra.dht.OrderPreservingPartitioner, and +# org.apache.cassandra.dht.CollatingOrderPreservingPartitioner. +# (CollatingOPP colates according to EN,US rules, not naive byte +# ordering. Use this as an example if you need locale-aware collation.) +partitioner: org.apache.cassandra.dht.RandomPartitioner + +# directories where Cassandra should store data on disk. +data_file_directories: + - /tmp/data + +# commit log +commitlog_directory: /tmp/commit + +# saved caches +saved_caches_directory: /tmp/caches + +# Size to allow commitlog to grow to before creating a new segment +commitlog_rotation_threshold_in_mb: 128 + +# commitlog_sync supports the following modes: +# +# batch: +# In batch mode, Cassandra won't ack writes until the commit log +# has been fsynced to disk. But fsyncing each write at once is +# performance-prohibitive, so instead Cassandra will wait up to +# commitlog_sync_batch_window_in_ms milliseconds for other writes, before +# syncing that "batch" at once. This causes a performance penalty +# of about 15% when the commitlog is on a separate device, and much more +# when it shares the same device as the data files. +# +# periodic: +# Writes may be acked immediately (without waiting for the commitlog +# append) and the CommitLog is simply synced every +# commitlog_sync_period_in_ms milliseconds. +# +# periodic_without_flush: +# Like periodic, but the commitlog write buffer is only flushed +# before the sync, so any interruption to the process can be +# expected to lose some writes. This is the old 0.6 periodic +# behavior and will be removed in future versions if testing +# continues to show no performance benefit over normal periodic. +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 +# commitlog_sync: batch +# commitlog_sync_batch_window_in_ms: 10 + +# Addresses of hosts that are deemed contact points. +# Cassandra nodes use this list of hosts to find each other and learn +# the topology of the ring. You must change this if you are running +# multiple nodes! +seeds: + - 127.0.0.1 + +# Access mode. mmapped i/o is substantially faster, but only practical on +# a 64bit machine (which notably does not include EC2 "small" instances) +# or relatively small datasets. "auto", the safe choice, will enable +# mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only" +# (which may allow you to get part of the benefits of mmap on a 32bit +# machine by mmapping only index files) and "standard". +# (The buffer size settings that follow only apply to standard, +# non-mmapped i/o.) +disk_access_mode: auto + +# Unlike most systems, in Cassandra writes are faster than reads, so +# you can afford more of those in parallel. A good rule of thumb is 2 +# concurrent reads per processor core. Increase ConcurrentWrites to +# the number of clients writing at once if you enable CommitLogSync + +# CommitLogSyncDelay. --> +concurrent_reads: 8 +concurrent_writes: 32 + +# This sets the amount of memtable flush writer threads. These will +# be blocked by disk io, and each one will hold a memtable in memory +# while blocked. If you have a large heap and many data directories, +# you can increase this value for better flush performance. +# By default this will be set to the amount of data directories defined. +#memtable_flush_writers: 1 + +# Buffer size to use when performing contiguous column slices. +# Increase this to the size of the column slices you typically perform +sliced_buffer_size_in_kb: 64 + +# TCP port, for commands and data +storage_port: 7000 + +# Address to bind to and tell other Cassandra nodes to connect to. You +# _must_ change this if you want multiple nodes to be able to +# communicate! +# +# Leaving it blank leaves it up to InetAddress.getLocalHost(). This +# will always do the Right Thing *if* the node is properly configured +# (hostname, name resolution, etc), and the Right Thing is to use the +# address associated with the hostname (it might not be). +# +# Setting this to 0.0.0.0 is always wrong. +listen_address: 127.0.0.1 + +# The address to bind the Thrift RPC service to -- clients connect +# here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if +# you want Thrift to listen on all interfaces. +# +# Leaving this blank has the same effect it does for ListenAddress, +# (i.e. it will be based on the configured hostname of the node). +rpc_address: 127.0.0.1 +# port for Thrift to listen for clients on +rpc_port: 9170 + +# enable or disable keepalive on rpc connections +rpc_keepalive: true + +# uncomment to set socket buffer sizes on rpc connections +# rpc_send_buff_size_in_bytes: +# rpc_recv_buff_size_in_bytes: + +# Frame size for thrift (maximum field length). +# 0 disables TFramedTransport in favor of TSocket. This option +# is deprecated; we strongly recommend using Framed mode. +thrift_framed_transport_size_in_mb: 15 + +# The max length of a thrift message, including all fields and +# internal thrift overhead. +thrift_max_message_length_in_mb: 16 + +# Whether or not to take a snapshot before each compaction. Be +# careful using this option, since Cassandra won't clean up the +# snapshots for you. Mostly useful if you're paranoid when there +# is a data format change. +snapshot_before_compaction: false + +# change this to increase the compaction thread's priority. In java, 1 is the +# lowest priority and that is our default. +# compaction_thread_priority: 1 + +# The threshold size in megabytes the binary memtable must grow to, +# before it's submitted for flushing to disk. +binary_memtable_throughput_in_mb: 256 + +# Add column indexes to a row after its contents reach this size. +# Increase if your column values are large, or if you have a very large +# number of columns. The competing causes are, Cassandra has to +# deserialize this much of the row to read a single column, so you want +# it to be small - at least if you do many partial-row reads - but all +# the index data is read for each access, so you don't want to generate +# that wastefully either. +column_index_size_in_kb: 64 + +# Size limit for rows being compacted in memory. Larger rows will spill +# over to disk and use a slower two-pass compaction process. A message +# will be logged specifying the row key. +in_memory_compaction_limit_in_mb: 64 + +# Time to wait for a reply from other nodes before failing the command +rpc_timeout_in_ms: 10000 + +# phi value that must be reached for a host to be marked down. +# most users should never need to adjust this. +# phi_convict_threshold: 8 + +# endpoint_snitch -- Set this to a class that implements +# IEndpointSnitch, which will let Cassandra know enough +# about your network topology to route requests efficiently. +# Out of the box, Cassandra provides +# - org.apache.cassandra.locator.SimpleSnitch: +# Treats Strategy order as proximity. This improves cache locality +# when disabling read repair, which can further improve throughput. +# - org.apache.cassandra.locator.RackInferringSnitch: +# Proximity is determined by rack and data center, which are +# assumed to correspond to the 3rd and 2nd octet of each node's +# IP address, respectively +# org.apache.cassandra.locator.PropertyFileSnitch: +# - Proximity is determined by rack and data center, which are +# explicitly configured in cassandra-topology.properties. +endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch + +# dynamic_snitch -- This boolean controls whether the above snitch is +# wrapped with a dynamic snitch, which will monitor read latencies +# and avoid reading from hosts that have slowed (due to compaction, +# for instance) +dynamic_snitch: true +# controls how often to perform the more expensive part of host score +# calculation +dynamic_snitch_update_interval_in_ms: 100 +# controls how often to reset all host scores, allowing a bad host to +# possibly recover +dynamic_snitch_reset_interval_in_ms: 600000 +# if set greater than zero and read_repair_chance is < 1.0, this will allow +# 'pinning' of replicas to hosts in order to increase cache capacity. +# The badness threshold will control how much worse the pinned host has to be +# before the dynamic snitch will prefer other replicas over it. This is +# expressed as a double which represents a percentage. Thus, a value of +# 0.2 means Cassandra would continue to prefer the static snitch values +# until the pinned host was 20% worse than the fastest. +dynamic_snitch_badness_threshold: 0.0 + +# request_scheduler -- Set this to a class that implements +# RequestScheduler, which will schedule incoming client requests +# according to the specific policy. This is useful for multi-tenancy +# with a single Cassandra cluster. +# NOTE: This is specifically for requests from the client and does +# not affect inter node communication. +# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place +# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of +# client requests to a node with a separate queue for each +# request_scheduler_id. The scheduler is further customized by +# request_scheduler_options as described below. +request_scheduler: org.apache.cassandra.scheduler.NoScheduler + +# Scheduler Options vary based on the type of scheduler +# NoScheduler - Has no options +# RoundRobin +# - throttle_limit -- The throttle_limit is the number of in-flight +# requests per client. Requests beyond +# that limit are queued up until +# running requests can complete. +# The value of 80 here is twice the number of +# concurrent_reads + concurrent_writes. +# - default_weight -- default_weight is optional and allows for +# overriding the default which is 1. +# - weights -- Weights are optional and will default to 1 or the +# overridden default_weight. The weight translates into how +# many requests are handled during each turn of the +# RoundRobin, based on the scheduler id. +# +# request_scheduler_options: +# throttle_limit: 80 +# default_weight: 5 +# weights: +# Keyspace1: 1 +# Keyspace2: 5 + +# request_scheduler_id -- An identifer based on which to perform +# the request scheduling. Currently the only valid option is keyspace. +# request_scheduler_id: keyspace + +# The Index Interval determines how large the sampling of row keys +# is for a given SSTable. The larger the sampling, the more effective +# the index is at the cost of space. +index_interval: 128 + +# Keyspaces have ColumnFamilies. (Usually 1 KS per application.) +# ColumnFamilies have Rows. (Dozens of CFs per KS.) +# Rows contain Columns. (Many per CF.) +# Columns contain name:value:timestamp. (Many per Row.) +# +# A KS is most similar to a schema, and a CF is most similar to a relational table. +# +# Keyspaces, ColumnFamilies, and Columns may carry additional +# metadata that change their behavior. These are as follows: +# +# Keyspace required parameters: +# - name: name of the keyspace; "system" is +# reserved for Cassandra Internals. +# - replica_placement_strategy: the class that determines how replicas +# are distributed among nodes. Contains both the class as well as +# configuration information. Must extend AbstractReplicationStrategy. +# Out of the box, Cassandra provides +# * org.apache.cassandra.locator.SimpleStrategy +# * org.apache.cassandra.locator.NetworkTopologyStrategy +# * org.apache.cassandra.locator.OldNetworkTopologyStrategy +# +# SimpleStrategy merely places the first +# replica at the node whose token is closest to the key (as determined +# by the Partitioner), and additional replicas on subsequent nodes +# along the ring in increasing Token order. +# +# With NetworkTopologyStrategy, +# for each datacenter, you can specify how many replicas you want +# on a per-keyspace basis. Replicas are placed on different racks +# within each DC, if possible. This strategy also requires rack aware +# snitch, such as RackInferringSnitch or PropertyFileSnitch. +# An example: +# - name: Keyspace1 +# replica_placement_strategy: org.apache.cassandra.locator.NetworkTopologyStrategy +# strategy_options: +# DC1 : 3 +# DC2 : 2 +# DC3 : 1 +# +# OldNetworkToplogyStrategy [formerly RackAwareStrategy] +# places one replica in each of two datacenters, and the third on a +# different rack in in the first. Additional datacenters are not +# guaranteed to get a replica. Additional replicas after three are placed +# in ring order after the third without regard to rack or datacenter. +# - replication_factor: Number of replicas of each row +# Keyspace optional paramaters: +# - strategy_options: Additional information for the replication strategy. +# - column_families: +# ColumnFamily required parameters: +# - name: name of the ColumnFamily. Must not contain the character "-". +# - compare_with: tells Cassandra how to sort the columns for slicing +# operations. The default is BytesType, which is a straightforward +# lexical comparison of the bytes in each column. Other options are +# AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType, LongType, +# and IntegerType (a generic variable-length integer type). +# You can also specify the fully-qualified class name to a class of +# your choice extending org.apache.cassandra.db.marshal.AbstractType. +# +# ColumnFamily optional parameters: +# - keys_cached: specifies the number of keys per sstable whose +# locations we keep in memory in "mostly LRU" order. (JUST the key +# locations, NOT any column values.) Specify a fraction (value less +# than 1) or an absolute number of keys to cache. Defaults to 200000 +# keys. +# - rows_cached: specifies the number of rows whose entire contents we +# cache in memory. Do not use this on ColumnFamilies with large rows, +# or ColumnFamilies with high write:read ratios. Specify a fraction +# (value less than 1) or an absolute number of rows to cache. +# Defaults to 0. (i.e. row caching is off by default) +# - comment: used to attach additional human-readable information about +# the column family to its definition. +# - read_repair_chance: specifies the probability with which read +# repairs should be invoked on non-quorum reads. must be between 0 +# and 1. defaults to 1.0 (always read repair). +# - gc_grace_seconds: specifies the time to wait before garbage +# collecting tombstones (deletion markers). defaults to 864000 (10 +# days). See http://wiki.apache.org/cassandra/DistributedDeletes +# - default_validation_class: specifies a validator class to use for +# validating all the column values in the CF. +# NOTE: +# min_ must be less than max_compaction_threshold! +# - min_compaction_threshold: the minimum number of SSTables needed +# to start a minor compaction. increasing this will cause minor +# compactions to start less frequently and be more intensive. setting +# this to 0 disables minor compactions. defaults to 4. +# - max_compaction_threshold: the maximum number of SSTables allowed +# before a minor compaction is forced. decreasing this will cause +# minor compactions to start more frequently and be less intensive. +# setting this to 0 disables minor compactions. defaults to 32. +# /NOTE +# - row_cache_save_period_in_seconds: number of seconds between saving +# row caches. The row caches can be saved periodically and if one +# exists on startup it will be loaded. +# - key_cache_save_period_in_seconds: number of seconds between saving +# key caches. The key caches can be saved periodically and if one +# exists on startup it will be loaded. +# - memtable_flush_after_mins: The maximum time to leave a dirty table +# unflushed. This should be large enough that it won't cause a flush +# storm of all memtables during periods of inactivity. +# - memtable_throughput_in_mb: The maximum size of the memtable before +# it is flushed. If undefined, 1/8 * heapsize will be used. +# - memtable_operations_in_millions: Number of operations in millions +# before the memtable is flushed. If undefined, throughput / 64 * 0.3 +# will be used. +# - column_metadata: +# Column required parameters: +# - name: binds a validator (and optionally an indexer) to columns +# with this name in any row of the enclosing column family. +# - validator: like cf.compare_with, an AbstractType that checks +# that the value of the column is well-defined. +# Column optional parameters: +# NOTE: +# index_name cannot be set if index_type is not also set! +# - index_name: User-friendly name for the index. +# - index_type: The type of index to be created. Currently only +# KEYS is supported. +# /NOTE +# +# NOTE: +# this keyspace definition is for demonstration purposes only. +# Cassandra will not load these definitions during startup. See +# http://wiki.apache.org/cassandra/FAQ#no_keyspaces for an explanation. +# /NOTE +keyspaces: + - name: Keyspace1 + replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy + replication_factor: 1 + column_families: + - name: Standard1 + compare_with: BytesType + keys_cached: 10000 + rows_cached: 1000 + row_cache_save_period_in_seconds: 0 + key_cache_save_period_in_seconds: 3600 + memtable_flush_after_mins: 59 + memtable_throughput_in_mb: 255 + memtable_operations_in_millions: 0.29 + + - name: Standard2 + compare_with: UTF8Type + read_repair_chance: 0.1 + keys_cached: 100 + gc_grace_seconds: 0 + min_compaction_threshold: 5 + max_compaction_threshold: 31 + + - name: StandardByUUID1 + compare_with: TimeUUIDType + + - name: Super1 + column_type: Super + compare_with: BytesType + compare_subcolumns_with: BytesType + + - name: Super2 + column_type: Super + compare_subcolumns_with: UTF8Type + rows_cached: 10000 + keys_cached: 50 + comment: 'A column family with supercolumns, whose column and subcolumn names are UTF8 strings' + + - name: Super3 + column_type: Super + compare_with: LongType + comment: 'A column family with supercolumns, whose column names are Longs (8 bytes)' + + - name: Indexed1 + default_validation_class: LongType + column_metadata: + - name: birthdate + validator_class: LongType + index_name: birthdate_idx + index_type: KEYS Index: cassandra-handler/conf/cassandra-topology.properties =================================================================== --- cassandra-handler/conf/cassandra-topology.properties (revision 0) +++ cassandra-handler/conf/cassandra-topology.properties (revision 0) @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Cassandra Node IP=Data Center:Rack +192.168.1.100=DC1:RAC1 +192.168.2.200=DC2:RAC2 + +10.0.0.10=DC1:RAC1 +10.0.0.11=DC1:RAC1 +10.0.0.12=DC1:RAC2 + +10.20.114.10=DC2:RAC1 +10.20.114.11=DC2:RAC1 + +10.21.119.13=DC3:RAC1 +10.21.119.10=DC3:RAC1 + +10.0.0.13=DC1:RAC2 +10.21.119.14=DC3:RAC2 +10.20.114.15=DC2:RAC2 + +# default for unknown nodes +default=DC1:r1 + Index: cassandra-handler/conf/log4j-tools.properties.old =================================================================== --- cassandra-handler/conf/log4j-tools.properties.old (revision 0) +++ cassandra-handler/conf/log4j-tools.properties.old (revision 0) @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set the root to INFO +# and the pattern to %c instead of %l. (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=WARN,stderr + +# stderr +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n Index: cassandra-handler/conf/README.txt =================================================================== --- cassandra-handler/conf/README.txt (revision 0) +++ cassandra-handler/conf/README.txt (revision 0) @@ -0,0 +1,13 @@ +Required configuration files +============================ + +cassandra.yaml: main Cassandra configuration file +log4j-server.proprties: log4j configuration file for Cassandra server + + +Optional configuration files +============================ + +access.properties: used for authorization +passwd.properties: used for authentication +cassandra-topology.properties: used by PropertyFileSnitch Index: cassandra-handler/conf/log4j-server.properties.old =================================================================== --- cassandra-handler/conf/log4j-server.properties.old (revision 0) +++ cassandra-handler/conf/log4j-server.properties.old (revision 0) @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set pattern to %c instead of %l. +# (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=INFO,stdout,R + +# stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n + +# rolling log file +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.R.maxFileSize=20MB +log4j.appender.R.maxBackupIndex=50 +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +# Edit the next line to point to your logs directory +log4j.appender.R.File=/var/log/cassandra/system.log + +# Application logging options +#log4j.logger.com.facebook=DEBUG +#log4j.logger.com.facebook.infrastructure.gms=DEBUG +#log4j.logger.com.facebook.infrastructure.db=DEBUG Index: cassandra-handler/conf/passwd.properties =================================================================== --- cassandra-handler/conf/passwd.properties (revision 0) +++ cassandra-handler/conf/passwd.properties (revision 0) @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a sample password file for SimpleAuthenticator. The format of +# this file is username=password. If -Dpasswd.mode=MD5 then the password +# is represented as an md5 digest, otherwise it is cleartext (keep this +# in mind when setting file mode and ownership). +jsmith=havebadpass +Elvis\ Presley=graceland4evar +dilbert=nomoovertime Index: cassandra-handler/conf/access.properties =================================================================== --- cassandra-handler/conf/access.properties (revision 0) +++ cassandra-handler/conf/access.properties (revision 0) @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a sample access file for SimpleAuthority. The format of this file +# is KEYSPACE[.COLUMNFAMILY].PERMISSION=USERS, where: +# +# * KEYSPACE is the keyspace name. +# * COLUMNFAMILY is the column family name. +# * PERMISSION is one of or for read-only or read-write respectively. +# * USERS is a comma delimited list of users from passwd.properties. +# +# See below for example entries. + +# NOTE: This file contains potentially sensitive information, please keep +# this in mind when setting its mode and ownership. + +# The magical '' property lists users who can modify the +# list of keyspaces: all users will be able to view the list of keyspaces. +=jsmith + +# Access to Keyspace1 (add/remove column families, etc). +Keyspace1.=jsmith,Elvis Presley +Keyspace1.=dilbert + +# Access to Standard1 (keyspace Keyspace1) +Keyspace1.Standard1.=jsmith,Elvis Presley,dilbert Index: cassandra-handler/conf/cassandra-env.sh =================================================================== --- cassandra-handler/conf/cassandra-env.sh (revision 0) +++ cassandra-handler/conf/cassandra-env.sh (revision 0) @@ -0,0 +1,123 @@ +MAX_HEAP_SIZE="1G" +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +calculate_heap_size() +{ + case "`uname`" in + Linux) + system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'` + MAX_HEAP_SIZE=$((system_memory_in_mb / 2))M + return 0 + ;; + FreeBSD) + system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'` + MAX_HEAP_SIZE=$((system_memory_in_bytes / 1024 / 1024 / 2))M + return 0 + ;; + *) + MAX_HEAP_SIZE=1024M + return 1 + ;; + esac +} + +# The amount of memory to allocate to the JVM at startup, you almost +# certainly want to adjust this for your environment. If left commented +# out, the heap size will be automatically determined by calculate_heap_size +# MAX_HEAP_SIZE="4G" + +if [ "x$MAX_HEAP_SIZE" = "x" ]; then + calculate_heap_size +fi + +# Specifies the default port over which Cassandra will be available for +# JMX connections. +JMX_PORT="8001" + +# To use mx4j, an HTML interface for JMX, add mx4j-tools.jar to the lib/ directory. +# By default mx4j listens on 0.0.0.0:8081. Uncomment the following lines to control +# its listen address and port. +#MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0" +#MX4J_PORT="-Dmx4jport=8081" + + +# Here we create the arguments that will get passed to the jvm when +# starting cassandra. + +# enable assertions. disabling this in production will give a modest +# performance benefit (around 5%). +JVM_OPTS="$JVM_OPTS -ea" + +# enable thread priorities, primarily so we can give periodic tasks +# a lower priority to avoid interfering with client workload +JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" +# allows lowering thread priority without being root. see +# http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html +JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42" + +# min and max heap sizes should be set to the same value to avoid +# stop-the-world GC pauses during resize, and so that we can lock the +# heap in memory on startup to prevent any of it from being swapped +# out. +JVM_OPTS="$JVM_OPTS -Xms$MAX_HEAP_SIZE" +JVM_OPTS="$JVM_OPTS -Xmx$MAX_HEAP_SIZE" +JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" + +if [ "`uname`" = "Linux" ] ; then + # reduce the per-thread stack size to minimize the impact of Thrift + # thread-per-client. (Best practice is for client connections to + # be pooled anyway.) Only do so on Linux where it is known to be + # supported. + JVM_OPTS="$JVM_OPTS -Xss128k" +fi + +# GC tuning options +JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" +JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" +JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" +JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" +JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1" +JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75" +JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly" + +# GC logging options -- uncomment to enable +# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails" +# JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps" +# JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram" +# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution" +# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" +# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log" + +# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See +# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: +# comment out this entry to enable IPv6 support). +JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true" + +# jmx: metrics and administration interface +# +# add this if you're having trouble connecting: +# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=" +# +# see +# http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole +# for more on configuring JMX through firewalls, etc. (Short version: +# get it working with no firewall first.) +JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" +JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" +JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" +JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS" +JVM_OPTS="$JVM_OPTS $MX4J_PORT" Index: cassandra-handler/ivy.xml =================================================================== --- cassandra-handler/ivy.xml (revision 0) +++ cassandra-handler/ivy.xml (revision 0) @@ -0,0 +1,8 @@ + + + + + + + + Index: cassandra-handler/lib/readme.txt =================================================================== --- cassandra-handler/lib/readme.txt (revision 0) +++ cassandra-handler/lib/readme.txt (revision 0) @@ -0,0 +1 @@ +Required jars are gathered by ../ql/ivy.xml Index: cassandra-handler/src/test/results/cassandra_queries.q.out =================================================================== --- cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0) +++ cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0) @@ -0,0 +1,450 @@ +PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: describe cassandra_keyspace1_standard1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe cassandra_keyspace1_standard1 +POSTHOOK: type: DESCTABLE +key int from deserializer +value string from deserializer +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 7) 0)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key))))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Filter Operator + predicate: + expr: ((key % 7) = 0) + type: boolean + Filter Operator + predicate: + expr: ((key % 7) = 0) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat + output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat + serde: org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe + name: default.cassandra_keyspace1_standard1 + + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@cassandra_keyspace1_standard1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: EXPLAIN +select * from cassandra_keyspace1_standard1 ORDER BY key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select * from cassandra_keyspace1_standard1 ORDER BY key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + cassandra_keyspace1_standard1 + TableScan + alias: cassandra_keyspace1_standard1 + Select Operator + expressions: + expr: key + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: int + expr: _col1 + type: string + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from cassandra_keyspace1_standard1 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-48_603_3517947258898162165/-mr-10000 +POSTHOOK: query: select * from cassandra_keyspace1_standard1 ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-48_603_3517947258898162165/-mr-10000 +79 val_0 +105 val_105 +119 val_119 +126 val_126 +133 val_133 +168 val_168 +175 val_175 +189 val_189 +196 val_196 +203 val_203 +217 val_217 +224 val_224 +238 val_238 +252 val_252 +266 val_266 +273 val_273 +280 val_280 +287 val_287 +289 val_28 +308 val_308 +315 val_315 +322 val_322 +336 val_336 +357 val_35 +364 val_364 +378 val_378 +392 val_392 +399 val_399 +406 val_406 +413 val_413 +427 val_427 +429 val_42 +448 val_448 +455 val_455 +462 val_462 +469 val_469 +483 val_483 +490 val_490 +497 val_497 +708 val_70 +779 val_77 +846 val_84 +983 val_98 +PREHOOK: query: EXPLAIN +select value from cassandra_keyspace1_standard1 ORDER BY VALUE +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select value from cassandra_keyspace1_standard1 ORDER BY VALUE +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL VALUE))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + cassandra_keyspace1_standard1 + TableScan + alias: cassandra_keyspace1_standard1 + Select Operator + expressions: + expr: value + type: string + outputColumnNames: _col0 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select value from cassandra_keyspace1_standard1 ORDER BY VALUE +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-52_764_5259521194449628162/-mr-10000 +POSTHOOK: query: select value from cassandra_keyspace1_standard1 ORDER BY VALUE +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-52_764_5259521194449628162/-mr-10000 +val_0 +val_105 +val_119 +val_126 +val_133 +val_168 +val_175 +val_189 +val_196 +val_203 +val_217 +val_224 +val_238 +val_252 +val_266 +val_273 +val_28 +val_280 +val_287 +val_308 +val_315 +val_322 +val_336 +val_35 +val_364 +val_378 +val_392 +val_399 +val_406 +val_413 +val_42 +val_427 +val_448 +val_455 +val_462 +val_469 +val_483 +val_490 +val_497 +val_70 +val_77 +val_84 +val_98 +PREHOOK: query: EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) a) (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (. (TOK_TABLE_OR_COL a) key))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 0 + value expressions: + expr: key + type: int + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 1 + value expressions: + expr: value + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col1, _col5 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/tmp/edward/hive_2011-03-07_14-13-56_906_8208402642844752554/-mr-10002 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col2 + type: string + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-57_016_5914164914507392118/-mr-10000 +POSTHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-57_016_5914164914507392118/-mr-10000 +79 val_0 val_0 +105 val_105 val_105 +119 val_119 val_119 +126 val_126 val_126 +133 val_133 val_133 +168 val_168 val_168 +175 val_175 val_175 +189 val_189 val_189 +196 val_196 val_196 +203 val_203 val_203 +217 val_217 val_217 +224 val_224 val_224 +238 val_238 val_238 +252 val_252 val_252 +266 val_266 val_266 +273 val_273 val_273 +280 val_280 val_280 +287 val_287 val_287 +289 val_28 val_28 +308 val_308 val_308 +315 val_315 val_315 +322 val_322 val_322 +336 val_336 val_336 +357 val_35 val_35 +364 val_364 val_364 +378 val_378 val_378 +392 val_392 val_392 +399 val_399 val_399 +406 val_406 val_406 +413 val_413 val_413 +427 val_427 val_427 +429 val_42 val_42 +448 val_448 val_448 +455 val_455 val_455 +462 val_462 val_462 +469 val_469 val_469 +483 val_483 val_483 +490 val_490 val_490 +497 val_497 val_497 +708 val_70 val_70 +779 val_77 val_77 +846 val_84 val_84 +983 val_98 val_98 Index: cassandra-handler/src/test/results/cassandra_queries =================================================================== --- cassandra-handler/src/test/results/cassandra_queries (revision 0) +++ cassandra-handler/src/test/results/cassandra_queries (revision 0) @@ -0,0 +1,365 @@ +PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: describe cassandra_keyspace1_standard1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe cassandra_keyspace1_standard1 +POSTHOOK: type: DESCTABLE +key int from deserializer +value string from deserializer +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 7) 0)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Filter Operator + predicate: + expr: ((key % 7) = 0) + type: boolean + Filter Operator + predicate: + expr: ((key % 7) = 0) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat + output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat + serde: org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe + name: default.cassandra_keyspace1_standard1 + + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@cassandra_keyspace1_standard1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: EXPLAIN +select * from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select * from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_494_7286511593311887975/-mr-10000 +POSTHOOK: query: select * from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_494_7286511593311887975/-mr-10000 +252 val_252 +126 val_126 +119 val_119 +196 val_196 +266 val_266 +392 val_392 +413 val_413 +983 val_98 +224 val_224 +427 val_427 +357 val_35 +203 val_203 +483 val_483 +469 val_469 +779 val_77 +79 val_0 +289 val_28 +399 val_399 +490 val_490 +364 val_364 +273 val_273 +462 val_462 +315 val_315 +238 val_238 +308 val_308 +322 val_322 +189 val_189 +429 val_42 +133 val_133 +217 val_217 +448 val_448 +105 val_105 +336 val_336 +846 val_84 +280 val_280 +287 val_287 +406 val_406 +497 val_497 +378 val_378 +708 val_70 +455 val_455 +175 val_175 +168 val_168 +PREHOOK: query: EXPLAIN +select value from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select value from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + cassandra_keyspace1_standard1 + TableScan + alias: cassandra_keyspace1_standard1 + Select Operator + expressions: + expr: value + type: string + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select value from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_885_3161823916748354795/-mr-10000 +POSTHOOK: query: select value from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_885_3161823916748354795/-mr-10000 +val_252 +val_126 +val_119 +val_196 +val_266 +val_392 +val_413 +val_98 +val_224 +val_427 +val_35 +val_203 +val_483 +val_469 +val_77 +val_0 +val_28 +val_399 +val_490 +val_364 +val_273 +val_462 +val_315 +val_238 +val_308 +val_322 +val_189 +val_42 +val_133 +val_217 +val_448 +val_105 +val_336 +val_84 +val_280 +val_287 +val_406 +val_497 +val_378 +val_70 +val_455 +val_175 +val_168 +PREHOOK: query: EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) a) (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 0 + value expressions: + expr: key + type: int + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 1 + value expressions: + expr: value + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col1, _col5 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-37_515_951258717592195422/-mr-10000 +POSTHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-37_515_951258717592195422/-mr-10000 +79 val_0 val_0 +105 val_105 val_105 +119 val_119 val_119 +126 val_126 val_126 +133 val_133 val_133 +168 val_168 val_168 +175 val_175 val_175 +189 val_189 val_189 +196 val_196 val_196 +203 val_203 val_203 +217 val_217 val_217 +224 val_224 val_224 +238 val_238 val_238 +252 val_252 val_252 +266 val_266 val_266 +273 val_273 val_273 +280 val_280 val_280 +287 val_287 val_287 +289 val_28 val_28 +308 val_308 val_308 +315 val_315 val_315 +322 val_322 val_322 +336 val_336 val_336 +357 val_35 val_35 +364 val_364 val_364 +378 val_378 val_378 +392 val_392 val_392 +399 val_399 val_399 +406 val_406 val_406 +413 val_413 val_413 +427 val_427 val_427 +429 val_42 val_42 +448 val_448 val_448 +455 val_455 val_455 +462 val_462 val_462 +469 val_469 val_469 +483 val_483 val_483 +490 val_490 val_490 +497 val_497 val_497 +708 val_70 val_70 +779 val_77 val_77 +846 val_84 val_84 +983 val_98 val_98 Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java =================================================================== --- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) +++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) @@ -0,0 +1,76 @@ +package org.apache.cassandra.contrib.utils.service; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; + +public class CassandraServiceDataCleaner { + + /** + * Creates all data dir if they don't exist and cleans them + * @throws IOException + */ + public void prepare() throws IOException { + makeDirsIfNotExist(); + cleanupDataDirectories(); + } + + /** + * Deletes all data from cassandra data directories, including the commit log. + * @throws IOException in case of permissions error etc. + */ + public void cleanupDataDirectories() throws IOException { + for (String s: getDataDirs()) { + cleanDir(s); + } + } + /** + * Creates the data diurectories, if they didn't exist. + * @throws IOException if directories cannot be created (permissions etc). + */ + public void makeDirsIfNotExist() throws IOException { + for (String s: getDataDirs()) { + mkdir(s); + } + } + + /** + * Collects all data dirs and returns a set of String paths on the file system. + * + * @return + */ + private Set getDataDirs() { + Set dirs = new HashSet(); + for (String s : DatabaseDescriptor.getAllDataFileLocations()) { + dirs.add(s); + } + dirs.add(DatabaseDescriptor.getCommitLogLocation()); + return dirs; + } + /** + * Creates a directory + * + * @param dir + * @throws IOException + */ + private void mkdir(String dir) throws IOException { + FileUtils.createDirectory(dir); + } + + /** + * Removes all directory content from file the system + * + * @param dir + * @throws IOException + */ + private void cleanDir(String dir) throws IOException { + File dirFile = new File(dir); + if (dirFile.exists() && dirFile.isDirectory()) { + FileUtils.delete(dirFile.listFiles()); + } + } +} Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) @@ -0,0 +1,86 @@ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; + +import junit.extensions.TestSetup; +import junit.framework.Test; + +import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.CfDef; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +public class CassandraTestSetup extends TestSetup{ + + static final Log LOG = LogFactory.getLog(CassandraTestSetup.class); + private EmbeddedCassandraService cassandra; + + public CassandraTestSetup(Test test){ + super(test); + } + + @SuppressWarnings("deprecation") + void preTest(HiveConf conf) throws IOException, TTransportException, TException { + if (cassandra==null){ + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + } + + FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1",9170,5000); + wrap.open(); + KsDef ks = new KsDef(); + ks.setName("Keyspace1"); + ks.setReplication_factor(1); + ks.setStrategy_class("org.apache.cassandra.locator.SimpleStrategy"); + CfDef cf = new CfDef(); + cf.setName("Standard1"); + cf.setKeyspace("Keyspace1"); + ks.addToCf_defs(cf); + Cassandra.Client client = wrap.getClient(); + try { + try { + KsDef exists = client.describe_keyspace("Keyspace1"); + } catch (NotFoundException ex){ + client.system_add_keyspace(ks); + System.out.println("ks added"); + try { + Thread.sleep(2000); + } catch (Exception ex2){} + } + } catch (InvalidRequestException e){ + throw new RuntimeException(e); + } + wrap.close(); + + String auxJars = conf.getAuxJars(); + auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://" + + new JobConf(conf, Cassandra.Client.class).getJar(); + auxJars += ",file://" + new JobConf(conf, StandardColumnSerDe.class).getJar(); + auxJars += ",file://" + new JobConf(conf, org.apache.thrift.transport.TSocket.class).getJar(); + auxJars += ",file://" + new JobConf(conf, com.google.common.collect.AbstractIterator.class).getJar(); + auxJars += ",file://" + new JobConf(conf, org.apache.commons.lang.ArrayUtils.class).getJar(); + conf.setAuxJars(auxJars); + + } + + @Override + protected void tearDown() throws Exception { + //do we need this? + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + } + +} + Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) @@ -0,0 +1,22 @@ +package org.apache.hadoop.hive.cassandra; + +import org.apache.hadoop.hive.ql.QTestUtil; + +/** + * HBaseQTestUtil initializes HBase-specific test fixtures. + */ +public class CassandraQTestUtil extends QTestUtil { + public CassandraQTestUtil( + String outDir, String logDir, boolean miniMr, CassandraTestSetup setup) + throws Exception { + + super(outDir, logDir, miniMr, null); + setup.preTest(conf); + super.init(); + } + + @Override + public void init() throws Exception { + // defer + } +} Index: cassandra-handler/src/test/queries/cassandra_queries.q =================================================================== --- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) +++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) @@ -0,0 +1,29 @@ +SET hive.support.concurrency=false; +add file conf/cassandra.yaml; + +CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1"); + +describe cassandra_keyspace1_standard1; + +EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key; + +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key; + +EXPLAIN +select * from cassandra_keyspace1_standard1 ORDER BY key; +select * from cassandra_keyspace1_standard1 ORDER BY key; + +EXPLAIN +select value from cassandra_keyspace1_standard1 ORDER BY VALUE; + +select value from cassandra_keyspace1_standard1 ORDER BY VALUE; + +EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key; + +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key; Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm =================================================================== --- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) +++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) @@ -0,0 +1,126 @@ +package org.apache.hadoop.hive.cli; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.hive.cassandra.CassandraQTestUtil; +import org.apache.hadoop.hive.cassandra.CassandraTestSetup; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.antlr.runtime.*; +import org.antlr.runtime.tree.*; + +public class $className extends TestCase { + + private CassandraQTestUtil qt; + private CassandraTestSetup setup; + + public $className(String name, CassandraTestSetup setup) { + super(name); + qt = null; + this.setup = setup; + } + + @Override + protected void setUp() { + try { + boolean miniMR = false; + if ("$clusterMode".equals("miniMR")) { + miniMR = true; + } + + qt = new CassandraQTestUtil( + "$resultsDir.getCanonicalPath()", + "$logDir.getCanonicalPath()", miniMR, setup); + +#foreach ($qf in $qfiles) + qt.addFile("$qf.getCanonicalPath()"); +#end + } catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in setup"); + } + } + + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + public static Test suite() { + TestSuite suite = new TestSuite(); + CassandraTestSetup setup = new CassandraTestSetup(suite); +#foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + suite.addTest(new $className("testCliDriver_$tname", setup)); +#end + return setup; + } + + #foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + public void testCliDriver_$tname() throws Exception { + try { + System.out.println("Begin query: " + "$fname"); + qt.cliInit("$fname"); + int ecode = qt.executeClient("$fname"); + if (ecode != 0) { + fail("Client Execution failed with error code = " + ecode); + } + if (SessionState.get() != null) { + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + + if (jobInfoMap.size() != 0) { + String cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) { + fail("Wrong return code in hive history"); + } + } + } + + ecode = qt.checkCliDriverResults("$fname"); + if (ecode != 0) { + fail("Client execution results failed with error code = " + ecode); + } + } catch (Throwable e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception"); + } + + System.out.println("Done query: " + "$fname"); + assertTrue("Test passed", true); + } + +#end +} + Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0) @@ -0,0 +1,174 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Collection; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.hadoop.io.Writable; + +public class HiveIColumn implements IColumn, Writable { + + private byte [] name; + private byte [] value; + private long timestamp; + + public HiveIColumn(){ + + } + + @Override + public ByteBuffer name() { + return ByteBuffer.wrap(name); + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public ByteBuffer value() { + return ByteBuffer.wrap(value); + } + + @Override + public void readFields(DataInput in) throws IOException { + name = new byte[in.readInt()]; + in.readFully(name); + + value= new byte[in.readInt()]; + in.readFully(value); + + timestamp = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(name.length); + out.write(name); + + out.writeInt(value.length); + out.write(value); + + out.writeLong(timestamp); + } + + //bean patterns + + public byte[] getName() { + return name; + } + + public void setName(byte[] name) { + this.name = name; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("HiveIColumn["); + sb.append("name "+new String(this.name)+" "); + sb.append("value "+new String(this.value)+" "); + sb.append("timestamp "+ this.timestamp+" "); + return sb.toString(); + } + + //not needed for current integration + + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + @Override + public void addColumn(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn diff(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public int getLocalDeletionTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMarkedForDeleteAt() { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(AbstractType arg0) { + throw new UnsupportedOperationException(); + } + + + @Override + public Collection getSubColumns() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isMarkedForDelete() { + throw new UnsupportedOperationException(); + } + + @Override + public long mostRecentLiveChangeAt() { + throw new UnsupportedOperationException(); + } + + @Override + public int serializedSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDigest(MessageDigest arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn deepCopy() { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn getSubColumn(ByteBuffer arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLive() { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn reconcile(IColumn arg0) { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java (revision 0) @@ -0,0 +1,66 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +/** + * + * HiveCassandraStandardRowResult. Used as the value side of + * the InputFormat + * + */ +public class HiveCassandraStandardRowResult implements Writable{ + + private Text key; + private MapWritable value; + + public HiveCassandraStandardRowResult() { + } + + @Override + public void readFields(DataInput in) throws IOException { + key =new Text(); + key.readFields(in); + value = new MapWritable(); + value.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + key.write(out); + value.write(out); + } + + public Text getKey() { + return key; + } + + public void setKey(Text key) { + this.key = key; + } + + public MapWritable getValue() { + return value; + } + + public void setValue(MapWritable value) { + this.value = value; + } + + @Override + public String toString(){ + StringBuffer sb = new StringBuffer(); + sb.append("RowResult key:"+key ); + for (Map.Entry entry : value.entrySet()){ + sb.append( "entry key:"+entry.getKey()+" " ); + sb.append( "entry value:"+entry.getValue()+" " ); + } + return sb.toString(); + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java (revision 0) @@ -0,0 +1,216 @@ +package org.apache.hadoop.hive.cassandra.input; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +@SuppressWarnings("deprecation") +public class HiveCassandraStandardColumnInputFormat extends +ColumnFamilyInputFormat implements InputFormat { + + static final Log LOG = LogFactory.getLog(HiveCassandraStandardColumnInputFormat.class); + @Override + public RecordReader getRecordReader + (InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { + HiveCassandraStandardSplit cassandraSplit = (HiveCassandraStandardSplit) split; + List columns = StandardColumnSerDe.parseColumnMapping(cassandraSplit.getColumnMapping()); + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + if (columns.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + org.apache.cassandra.hadoop.ColumnFamilySplit cfSplit= cassandraSplit.getSplit(); + Job job=new Job(jobConf); + + TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(),new TaskAttemptID() ){ + @Override + public void progress(){ + reporter.progress(); + } + }; + + SlicePredicate predicate = new SlicePredicate(); + SliceRange range = new SliceRange(); + range.setStart(new byte[0]); + range.setFinish(new byte[0]); + range.setReversed(false); + range.setCount(cassandraSplit.getSlicePredicateSize()); + predicate.setSlice_range(range); + + final org.apache.hadoop.mapreduce.RecordReader> + recordReader = createRecordReader(cfSplit, tac); + + try { + ConfigHelper.setInputColumnFamily(tac.getConfiguration(), + cassandraSplit.getKeyspace(), cassandraSplit.getColumnFamily()); + + ConfigHelper.setInputSlicePredicate(tac.getConfiguration(), predicate); + ConfigHelper.setRangeBatchSize(tac.getConfiguration(), cassandraSplit.getRangeBatchSize()); + ConfigHelper.setRpcPort(tac.getConfiguration(), cassandraSplit.getPort()+"" ); + ConfigHelper.setInitialAddress(tac.getConfiguration(), cassandraSplit.getHost()); + ConfigHelper.setPartitioner(tac.getConfiguration(),cassandraSplit.getPartitioner()); + + recordReader.initialize(cfSplit, tac); + } catch (InterruptedException ie){ + throw new IOException(ie); + } catch (Exception ie){ + throw new IOException(ie); + } + return new RecordReader(){ + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public Text createKey() { + return new Text(); + } + + @Override + public HiveCassandraStandardRowResult createValue() { + return new HiveCassandraStandardRowResult(); + } + + @Override + public long getPos() throws IOException { + return 0l; + } + + @Override + public float getProgress() throws IOException { + float progress = 0.0F; + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return progress; + } + + @Override + public boolean next(Text rowKey, HiveCassandraStandardRowResult value) throws IOException { + boolean next = false; + try { + next = recordReader.nextKeyValue(); + if (next){ + rowKey.set( ByteBufferUtil.getArray(recordReader.getCurrentKey()) ); + MapWritable theMap = new MapWritable(); + for (Map.Entry entry : recordReader.getCurrentValue().entrySet()) { + HiveIColumn hic = new HiveIColumn(); + hic.setName( ByteBufferUtil.getArray(entry.getValue().name()) ); + hic.setValue( ByteBufferUtil.getArray(entry.getValue().value()) ); + hic.setTimestamp(entry.getValue().timestamp()); + theMap.put(new BytesWritable( ByteBufferUtil.getArray(entry.getValue().name())), hic ); + } + value.setKey(rowKey); + value.setValue(theMap); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return next; + } + }; + + } + + /** The Cassandra record Reader throws InteruptedException, + * we overlay here to throw IOException instead. */ + @Override + public org.apache.hadoop.mapreduce.RecordReader> + createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException{ + org.apache.hadoop.mapreduce.RecordReader> result = null; + try { + result = super.createRecordReader(inputSplit, taskAttemptContext); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (Exception ex){ + throw new IOException(ex); + } + return result; + } + + @Override + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + String ks = jobConf.get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + String cf = jobConf.get(StandardColumnSerDe.CASSANDRA_CF_NAME); + int slicePredicateSize = jobConf.getInt(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, 1000); + int sliceRangeSize = jobConf.getInt(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, 1000); + String cassandraColumnMapping = jobConf.get(StandardColumnSerDe.CASSANDRA_COL_MAPPING); + int rpcPort = jobConf.getInt(StandardColumnSerDe.CASSANDRA_PORT,9160); + String host = jobConf.get(StandardColumnSerDe.CASSANDRA_HOST); + String partitioner = jobConf.get(StandardColumnSerDe.CASSANDRA_PARTITIONER); + + if (cassandraColumnMapping == null){ + throw new IOException("cassandra.columns.mapping required for Cassandra Table."); + } + + SliceRange range = new SliceRange(); + range.setStart(new byte[0]); + range.setFinish(new byte[0]); + range.setReversed(false); + range.setCount(slicePredicateSize); + SlicePredicate predicate = new SlicePredicate(); + predicate.setSlice_range(range); + + ConfigHelper.setRpcPort(jobConf, ""+rpcPort); + ConfigHelper.setInitialAddress(jobConf, host); + ConfigHelper.setPartitioner(jobConf, partitioner); + ConfigHelper.setInputSlicePredicate(jobConf, predicate); + ConfigHelper.setInputColumnFamily(jobConf, ks, cf); + ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize); + + Job job = new Job(jobConf); + JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID()); + + Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); + List splits = getSplits(jobContext); + InputSplit [] results = new InputSplit[splits.size()]; + + for (int i =0; i < splits.size(); ++i){ + HiveCassandraStandardSplit csplit = new HiveCassandraStandardSplit( + (ColumnFamilySplit)splits.get(i), cassandraColumnMapping, tablePaths[0] ); + csplit.setKeyspace(ks); + csplit.setColumnFamily(cf); + csplit.setRangeBatchSize(sliceRangeSize); + csplit.setHost(host); + csplit.setPort(rpcPort); + csplit.setSlicePredicateSize(slicePredicateSize); + csplit.setPartitioner(partitioner); + csplit.setColumnMapping(cassandraColumnMapping); + results[i] = csplit; + } + return results; + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0) @@ -0,0 +1,121 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + +public class LazyCassandraRow extends LazyStruct{ + private List cassandraColumns; + private HiveCassandraStandardRowResult rowResult; + private ArrayList cachedList; + + public LazyCassandraRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + public void init(HiveCassandraStandardRowResult crr, List cassandraColumns, List cassandraColumnsBytes) { + this.rowResult = crr; + this.cassandraColumns = cassandraColumns; + setParsed(false); + } + + private void parse(){ + if (getFields() == null) { + List fieldRefs = ((StructObjectInspector)getInspector()).getAllStructFieldRefs(); + setFields(new LazyObject[fieldRefs.size()]); + for (int i = 0; i < getFields().length; i++) { + String cassandraColumn = this.cassandraColumns.get(i); + if (cassandraColumn.endsWith(":")) { + // want all columns as a map + getFields()[i] = new LazyCassandraCellMap( (LazyMapObjectInspector) + fieldRefs.get(i).getFieldObjectInspector()); + } else { + // otherwise only interested in a single column + getFields()[i] = LazyFactory.createLazyObject( + fieldRefs.get(i).getFieldObjectInspector()); + } + } + setFieldInited(new boolean[getFields().length]); + } + Arrays.fill(getFieldInited(), false); + setParsed(true); + } + + @Override + public Object getField(int fieldID) { + if (!getParsed()) { + parse(); + } + return uncheckedGetField(fieldID); + } + + private Object uncheckedGetField(int fieldID) { + if (!getFieldInited()[fieldID]) { + getFieldInited()[fieldID] = true; + ByteArrayRef ref = null; + String columnName = cassandraColumns.get(fieldID); + String cfName=columnName.split(":")[0]; + String colName=columnName.split(":")[1]; + if (columnName.equals(StandardColumnSerDe.CASSANDRA_KEY_COLUMN)) { + //user is asking for key column + ref = new ByteArrayRef(); + ref.setData( rowResult.getKey().getBytes() ); + } else if (columnName.endsWith(":")) { + //user wants all columns as a map + //TODO this into a LazyCassandraCellMap + return null; + } else { + //user wants the value of a single column + Writable res = rowResult.getValue().get( new BytesWritable(colName.getBytes()) ) ; + HiveIColumn hiveIColumn = (HiveIColumn) res; + if ( hiveIColumn != null ) { + ref = new ByteArrayRef(); + ref.setData( hiveIColumn.value().array() ); + } else { + return null; + } + } + if (ref != null) { + getFields()[fieldID].init(ref, 0, ref.getData().length); + } + } + return getFields()[fieldID].getObject(); + } + + /** + * Get the values of the fields as an ArrayList. + * @return The values of the fields as an ArrayList. + */ + @Override + public ArrayList getFieldsAsList() { + if (!getParsed()) { + parse(); + } + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i = 0; i < getFields().length; i++) { + cachedList.add(uncheckedGetField(i)); + } + return cachedList; + } + + @Override + public Object getObject() { + return this; + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java (revision 0) @@ -0,0 +1,146 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; + +@SuppressWarnings("deprecation") +public class HiveCassandraStandardSplit extends FileSplit implements InputSplit{ + private final ColumnFamilySplit split; + private String columnMapping; + private String keyspace; + private String columnFamily; + private int rangeBatchSize; + private int slicePredicateSize; + //added for 7.0 + private String partitioner; + private int port; + private String host; + + public HiveCassandraStandardSplit() { + super((Path) null, 0, 0, (String[]) null); + columnMapping = ""; + split = new ColumnFamilySplit(null,null,null); + } + + public HiveCassandraStandardSplit(ColumnFamilySplit split, String columnsMapping, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.split = split; + columnMapping = columnsMapping; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + columnMapping = in.readUTF(); + keyspace = in.readUTF(); + columnFamily = in.readUTF(); + rangeBatchSize = in.readInt(); + slicePredicateSize = in.readInt(); + partitioner = in.readUTF(); + port = in.readInt(); + host = in.readUTF(); + split.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(columnMapping); + out.writeUTF(keyspace); + out.writeUTF(columnFamily); + out.writeInt(rangeBatchSize); + out.writeInt(slicePredicateSize); + out.writeUTF(partitioner); + out.writeInt(port); + out.writeUTF(host); + split.write(out); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } + + @Override + public long getLength() { + return split.getLength(); + } + + public String getKeyspace() { + return keyspace; + } + + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public int getRangeBatchSize() { + return rangeBatchSize; + } + + public void setRangeBatchSize(int rangeBatchSize) { + this.rangeBatchSize = rangeBatchSize; + } + + public int getSlicePredicateSize() { + return slicePredicateSize; + } + + public void setSlicePredicateSize(int slicePredicateSize) { + this.slicePredicateSize = slicePredicateSize; + } + + public ColumnFamilySplit getSplit() { + return split; + } + + public String getColumnMapping() { + return columnMapping; + } + + public void setColumnMapping(String mapping){ + this.columnMapping=mapping; + } + + public void setPartitioner(String part){ + partitioner = part; + } + + public String getPartitioner(){ + return partitioner; + } + + public int getPort(){ + return port; + } + + public void setPort(int port){ + this.port = port; + } + + public String getHost(){ + return host; + } + + public void setHost(String host){ + this.host = host; + } + + public String toString(){ + return this.host+" "+this.port+" "+this.partitioner; + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java (revision 0) @@ -0,0 +1,82 @@ +package org.apache.hadoop.hive.cassandra.input; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; + +public class LazyCassandraCellMap extends LazyMap{ + + private HiveCassandraStandardRowResult rowResult; + private String cassandraColumnFamily; + + protected LazyCassandraCellMap(LazyMapObjectInspector oi) { + super(oi); + } + + public void init(HiveCassandraStandardRowResult rr, String columnFamily) { + rowResult = rr; + cassandraColumnFamily = columnFamily; + setParsed(false); + } + + private void parse() { + if (cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + + //NavigableMap familyMap = new TreeMap(); + } + + /** + * Get the value in the map for the given key. + * + * @param key + * @return + */ + + @Override + public Object getMapValueElement(Object key) { + if (!getParsed()) { + parse(); + } + + for (Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive) entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive + // writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI != null) { + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject) entry.getValue(); + return v == null ? v : v.getObject(); + } + } + } + + return null; + } + + @Override + public Map getMap() { + if (!getParsed()) { + parse(); + } + return cachedMap; + } + + @Override + public int getMapSize() { + if (!getParsed()) { + parse(); + } + return cachedMap.size(); + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) @@ -0,0 +1,111 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Properties; + +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.FramedConnWrapper; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +@SuppressWarnings("deprecation") +public class HiveCassandraOutputFormat implements HiveOutputFormat, + OutputFormat { + + static final Log LOG = LogFactory.getLog(HiveCassandraOutputFormat.class); + + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + + final String cassandraKeySpace = jc.get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + final String cassandraHost = jc.get(StandardColumnSerDe.CASSANDRA_HOST); + final int cassandraPort = Integer.parseInt(jc.get(StandardColumnSerDe.CASSANDRA_PORT)); + final String consistencyLevel = jc.get(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, "ONE"); + ConsistencyLevel level = null; + if (consistencyLevel.equalsIgnoreCase("QUORUM")) { + level = ConsistencyLevel.QUORUM; + } else if (consistencyLevel.equalsIgnoreCase("ALL")) { + level = ConsistencyLevel.ALL; + } else { + level = ConsistencyLevel.ONE; + } + final ConsistencyLevel fLevel = level; + + return new RecordWriter() { + private FramedConnWrapper wrap; + + @Override + public void close(boolean abort) throws IOException { + if (wrap != null) { + wrap.close(); + } + } + + @Override + public void write(Writable w) throws IOException { + CassandraPut put = (CassandraPut) w; + + for (CassandraColumn c : put.getColumns()) { + ColumnParent parent = new ColumnParent(); + parent.setColumn_family(c.getColumnFamily()); + try { + ensureConnection(cassandraHost, cassandraPort); + wrap.getClient().set_keyspace(cassandraKeySpace); + Column col = new Column(); + col.setValue(c.getValue()); + col.setTimestamp(c.getTimeStamp()); + col.setName(c.getColumn()); + wrap.getClient().insert(ByteBuffer.wrap(put.getKey().getBytes()), parent, col, fLevel); + } catch (InvalidRequestException e) { + throw new IOException(e); + } catch (UnavailableException e) { + throw new IOException(e); + } catch (TimedOutException e) { + throw new IOException(e); + } catch (TException e) { + throw new IOException(e); + } + } + } // end write + + public void ensureConnection(String host, int port) throws TTransportException { + if (wrap == null) { + wrap = new FramedConnWrapper(host, port, 5000); + wrap.open(); + } + } + + }; + } + + @Override + public void checkOutputSpecs(FileSystem arg0, JobConf jc) throws IOException { + + } + + @Override + public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem arg0, + JobConf arg1, String arg2, Progressable arg3) throws IOException { + throw new RuntimeException("Error: Hive should not invoke this method."); + } +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) @@ -0,0 +1,86 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; + +public class CassandraPut implements Writable{ + + private String key; + //private ConsistencyLevel level; + private List columns; + + public CassandraPut(){ + columns = new ArrayList(); + } + + public CassandraPut(String key){ + this(); + setKey(key); + } + + @Override + public void readFields(DataInput in) throws IOException { + key = in.readUTF(); + int ilevel = in.readInt(); + int cols = in.readInt(); + for (int i =0;i getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("key: "); + sb.append(this.key); + for (CassandraColumn col:this.columns){ + sb.append( "column : [" ); + sb.append( col.toString() ); + sb.append( "]" ); + } + return sb.toString(); + } +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) @@ -0,0 +1,78 @@ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class CassandraColumn implements Writable{ + + private String columnFamily; + private long timeStamp; + private byte [] column; + private byte [] value; + + @Override + public void readFields(DataInput din) throws IOException { + columnFamily = din.readUTF(); + timeStamp = din.readLong(); + int clength= din.readInt(); + column = new byte[clength]; + din.readFully(column, 0, clength); + int vlength = din.readInt(); + value = new byte [vlength ]; + din.readFully( value, 0 , vlength); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(columnFamily); + out.writeLong(timeStamp); + out.writeInt( column.length ); + out.write(column); + out.writeInt( value.length ); + out.write(value); + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public byte[] getColumn() { + return column; + } + + public void setColumn(byte[] column) { + this.column = column; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append( "cf:"+ this.columnFamily); + sb.append( "column:"+ new String (this.column)); + sb.append( "value:"+ new String (this.value)); + return sb.toString(); + } +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) @@ -0,0 +1,225 @@ +package org.apache.hadoop.hive.cassandra; + +import java.util.Map; +import java.util.Properties; + +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat; +import org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook { + + //private TSocket socket; + //private TTransport transport; + //private TProtocol proto; + //private Cassandra.Client client; + + private FramedConnWrapper wrap; + + private Configuration configuration; + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) { + Properties tableProperties = tableDesc.getProperties(); + String keyspaceName = tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + if (keyspaceName == null) { + keyspaceName = tableProperties.getProperty(Constants.META_TABLE_NAME); + } + + jobProperties.put(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME, keyspaceName); + jobProperties.put(StandardColumnSerDe.CASSANDRA_CF_NAME, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_CF_NAME)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_COL_MAPPING, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_COL_MAPPING)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_HOST, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_HOST)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_PORT, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_PORT)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_PARTITIONER, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_PARTITIONER)); + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_THRIFT_MODE) != null){ + jobProperties.put(StandardColumnSerDe.CASSANDRA_THRIFT_MODE, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_THRIFT_MODE)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_THRIFT_MODE, "FRAMED" ); + } + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL) != null){ + jobProperties.put(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, "ONE" ); + } + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE) != null){ + jobProperties.put(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, "1000" ); + } + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE) != null){ + jobProperties.put(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, "1000" ); + } + } + + @Override + public Class getInputFormatClass() { + return HiveCassandraStandardColumnInputFormat.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public Class getOutputFormatClass() { + return HiveCassandraOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return StandardColumnSerDe.class; + } + + @Override + public Configuration getConf() { + return this.configuration; + } + + @Override + public void setConf(Configuration arg0) { + this.configuration=arg0; + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (!isExternal) { + throw new MetaException("Cassandra tables must be external."); + } + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for Cassandra."); + } + String keyspaceName = this.getCassandraKeyspaceName(tbl); + + Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_COL_MAPPING) == null) { + throw new MetaException("The column mapping must be defiend as " + + StandardColumnSerDe.CASSANDRA_COL_MAPPING ); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_CF_NAME) == null) { + throw new MetaException("The column family name must be defined as " + + StandardColumnSerDe.CASSANDRA_CF_NAME); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_HOST) == null) { + throw new MetaException("Initial host to determine splits must be suppled " + + StandardColumnSerDe.CASSANDRA_HOST); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_PORT) == null) { + throw new MetaException("The RCP port must be defined (typically 9160) " + + StandardColumnSerDe.CASSANDRA_PORT); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_PARTITIONER) == null) { + throw new MetaException("The partitioner must be defined (typically " + + "org.apache.cassandra.dht.RandomPartitioner) " + + StandardColumnSerDe.CASSANDRA_PARTITIONER); + } + + String cassandraHost = serdeParam.get(StandardColumnSerDe.CASSANDRA_HOST); + int cassandraPort = Integer.parseInt(serdeParam.get(StandardColumnSerDe.CASSANDRA_PORT)); + + try { + this.ensureConnection(cassandraHost, cassandraPort); + try { + KsDef ks = wrap.getClient().describe_keyspace(keyspaceName); + } catch (NotFoundException ex){ + throw new MetaException( keyspaceName +" not found. The storage handler will not create it. "); + } + } catch (TException e) { + throw new MetaException("An internal exception prevented this action from taking place." + e.getMessage() ); + } catch (InvalidRequestException e) { + throw new MetaException("An internal exception prevented this action from taking place." + e.getMessage() ); + } + } + + private String getCassandraKeyspaceName(Table tbl) { + String tableName = tbl.getParameters().get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + if (tableName == null) { + tableName = tbl.getSd().getSerdeInfo().getParameters().get( + StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + } + if (tableName == null) { + tableName = tbl.getTableName(); + } + return tableName; + } +/* + private void ensureConnection(String host,int port, int timeout){ + //tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_THRIFT_MODE) + if (transport == null || (transport!=null && !transport.isOpen())){ + socket = new TSocket(host,port,5000); + transport = new TFramedTransport(socket); + proto = new TBinaryProtocol(transport); + try { + transport.open(); + this.client = new Cassandra.Client(proto); + } catch (TTransportException e) { + throw new RuntimeException ("ahh" +e); + } + } + } +*/ + @Override + public void commitCreateTable(Table table) throws MetaException { + //No work needed + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + //No work needed + } + + @Override + public void preDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // nothing to do + } + + public void ensureConnection(String host, int port) throws TTransportException { + if (wrap == null) { + wrap = new FramedConnWrapper(host, port, 5000); + wrap.open(); + } + } +} \ No newline at end of file Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java (revision 0) @@ -0,0 +1,431 @@ +package org.apache.hadoop.hive.cassandra.serde; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardRowResult; +import org.apache.hadoop.hive.cassandra.input.LazyCassandraRow; +import org.apache.hadoop.hive.cassandra.output.CassandraColumn; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; + +public class StandardColumnSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(StandardColumnSerDe.class.getName()); + public static final String CASSANDRA_KEYSPACE_NAME = "cassandra.ks.name"; //keyspace + public static final String CASSANDRA_CF_NAME = "cassandra.cf.name"; //column family + public static final String CASSANDRA_RANGE_BATCH_SIZE = "cassandra.range.size"; + public static final String CASSANDRA_SLICE_PREDICATE_SIZE = "cassandra.slice.predicate.size"; + public static final String CASSANDRA_HOST = "cassandra.host"; //initialHost + public static final String CASSANDRA_PORT = "cassandra.port"; //rcpPort + public static final String CASSANDRA_PARTITIONER = "cassandra.partitioner"; //partitioner + public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; + public static final String CASSANDRA_KEY_COLUMN = ":key"; + public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.consistency.level"; + public static final String CASSANDRA_THRIFT_MODE = "cassandra.thrift.mode"; + + /* names of columns from SerdeParameters */ + private List cassandraColumnNames; + /* index of key column in results */ + private int iKey; + + private ObjectInspector cachedObjectInspector; + private SerDeParameters serdeParams; + private LazyCassandraRow cachedCassandraRow; + private List cassandraColumnNamesBytes; + private final ByteStream.Output serializeStream = new ByteStream.Output(); + private boolean useJSONSerialize; + + private byte[] separators; // the separators array + private boolean escaped; // whether we need to escape the data when writing out + private byte escapeChar; // which char to use as the escape char, e.g. '\\' + private boolean[] needsEscape; // which chars need to be escaped. This array should have size + + // of 128. Negative byte values (or byte values >= 128) are + // never escaped. + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + initCassandraSerDeParameters(conf, tbl, getClass().getName()); + cachedObjectInspector = LazyFactory.createLazyStructInspector( + serdeParams.getColumnNames(), + serdeParams.getColumnTypes(), + serdeParams.getSeparators(), + serdeParams.getNullSequence(), + serdeParams.isLastColumnTakesRest(), + serdeParams.isEscaped(), + serdeParams.getEscapeChar()); + cachedCassandraRow = new LazyCassandraRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("CassandraSerDe initialized with : columnNames = " + + StringUtils.join(serdeParams.getColumnNames(),",") + + " columnTypes = " + + StringUtils.join(serdeParams.getColumnTypes(), ",") + + " cassandraColumnMapping = " + + cassandraColumnNames); + } + } + + private void initCassandraSerDeParameters(Configuration job, Properties tbl, String serdeName) + throws SerDeException { + String cassandraColumnNameProperty = + tbl.getProperty(StandardColumnSerDe.CASSANDRA_COL_MAPPING); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + cassandraColumnNames = parseColumnMapping(cassandraColumnNameProperty); + iKey = cassandraColumnNames.indexOf(StandardColumnSerDe.CASSANDRA_KEY_COLUMN); + + cassandraColumnNamesBytes = initColumnNamesBytes(cassandraColumnNames); + + // Build the type property string if not supplied + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < cassandraColumnNames.size(); i++) { + if (sb.length() > 0) { + sb.append(":"); + } + String colName = cassandraColumnNames.get(i); + if (isSpecialColumn(colName)) { + // a special column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } else if (colName.endsWith(":")) { + // a the cf name becomes a MAP + sb.append( + Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + + "," + Constants.STRING_TYPE_NAME + ">"); + } else { + // an individual column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); + + if (cassandraColumnNames.size() != serdeParams.getColumnNames().size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.getColumnNames().size() + + " elements while hbase.columns.mapping has " + + cassandraColumnNames.size() + " elements" + + " (counting the key if implicit)"); + } + + separators = serdeParams.getSeparators(); + escaped = serdeParams.isEscaped(); + escapeChar = serdeParams.getEscapeChar(); + needsEscape = serdeParams.getNeedsEscape(); + + // we just can make sure that "StandardColumn:" is mapped to MAP + for (int i = 0; i < cassandraColumnNames.size(); i++) { + String cassandraColName = cassandraColumnNames.get(i); + if (cassandraColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.getColumnTypes().get(i); + if ((typeInfo.getCategory() != Category.MAP) || + (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName() + != Constants.STRING_TYPE_NAME)) { + + throw new SerDeException( + serdeName + ": Cassandra column family '" + + cassandraColName + + "' should be mapped to map but is mapped to " + + typeInfo.getTypeName()); + } + } + } + } + + /* + * + * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) + * Turns a Cassandra row into a Hive row. + */ + @Override + public Object deserialize(Writable w) throws SerDeException { + if (!(w instanceof HiveCassandraStandardRowResult)) { + throw new SerDeException(getClass().getName() + ": expects Cassandra Row Result"); + } + HiveCassandraStandardRowResult crr = (HiveCassandraStandardRowResult) w; + this.cachedCassandraRow.init(crr, this.cassandraColumnNames, cassandraColumnNamesBytes); + return cachedCassandraRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return this.cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return CassandraPut.class; + } + + /* + * Turns obj (a Hive Row) into a CassandraPut which is key and a MapWritable + * of column/values + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames().size() > 0) ? + ((StructObjectInspector) getObjectInspector()).getAllStructFieldRefs() + : null; + CassandraPut put = null; + try { + String key = serializeField(iKey, null, fields, list, declaredFields); + if (key == null) { + throw new SerDeException("Cassandra row key cannot be NULL"); + } + put = new CassandraPut(key); + // Serialize each field except key (done already) + for (int i = 0; i < fields.size(); i++) { + if (i != iKey) { + serializeField(i, put, fields, list, declaredFields); + } + } + } catch (IOException e) { + throw new SerDeException(e); + } + return put; + } + + private String serializeField( int i, CassandraPut put, List fields, + List list, List declaredFields) throws IOException { + + // column name + String cassandraColumn = cassandraColumnNames.get(i); + + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + if (f == null) { + return null; + } + + // If the field corresponds to a column family in cassandra + if (cassandraColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector) foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(f); + if (map == null) { + return null; + } else { + for (Map.Entry entry : map.entrySet()) { + // Get the Key + serializeStream.reset(); + serialize(entry.getKey(), koi, 3); + + // Get the column-qualifier + byte[] columnQualifier = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, + serializeStream.getCount()); + + // Get the Value + serializeStream.reset(); + + boolean isNotNull = serialize(entry.getValue(), voi, 3); + if (!isNotNull) { + continue; + } + byte[] value = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount()); + + CassandraColumn cc = new CassandraColumn(); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setColumnFamily(cassandraColumn); + cc.setColumn(columnQualifier); + cc.setValue(value); + put.getColumns().add(cc); + + } + } + } else { + + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + + serializeStream.reset(); + boolean isNotNull; + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + isNotNull = serialize( SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1); + } else { + isNotNull = serialize(f, foi, 1); + } + if (!isNotNull) { + return null; + } + byte[] key = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i == iKey) { + return new String(key); + } + CassandraColumn cc = new CassandraColumn(); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setColumnFamily(this.cassandraColumnNames.get(i).split(":")[0]); + cc.setColumn(this.cassandraColumnNames.get(i).split(":")[1].getBytes()); + cc.setValue(key); + put.getColumns().add(cc); + } + return null; + } + + private boolean serialize(Object obj, ObjectInspector objInspector, int level) + throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8( + serializeStream, obj, + (PrimitiveObjectInspector) objInspector, + escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector) objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), eoi, level + 1); + } + } + return true; + } + case MAP: { + char separator = (char) separators[level]; + char keyValueSeparator = (char) separators[level + 1]; + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry : map.entrySet()) { + if (first) { + first = false; + } else { + serializeStream.write(separator); + } + serialize(entry.getKey(), koi, level + 2); + serializeStream.write(keyValueSeparator); + serialize(entry.getValue(), voi, level + 2); + } + } + return true; + } + case STRUCT: { + char separator = (char) separators[level]; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1); + } + } + return true; + } + } + throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); + } + + public static List parseColumnMapping(String columnMapping) { + String[] columnArray = columnMapping.split(","); + List columnList = Arrays.asList(columnArray); + int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); + if (iKey == -1) { + columnList = new ArrayList(columnList); + columnList.add(0, CASSANDRA_KEY_COLUMN); + } + return columnList; + } + + public static List initColumnNamesBytes(List columnNames) { + List columnBytes = new ArrayList(); + String column = null; + for (int i = 0; i < columnNames.size(); i++) { + column = columnNames.get(i); + try { + if (column.endsWith(":")) { + columnBytes.add((column.split(":")[0]).getBytes("UTF-8")); + } else { + columnBytes.add((column).getBytes("UTF-8")); + } + } catch (UnsupportedEncodingException ex){ + throw new RuntimeException(ex); + } + } + return columnBytes; + } + + public static boolean isSpecialColumn(String columnName) { + return columnName.equals(CASSANDRA_KEY_COLUMN); + } + +} Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java (revision 0) @@ -0,0 +1,31 @@ +package org.apache.hadoop.hive.cassandra; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class FramedConnWrapper { + private final TTransport transport; + private final TProtocol proto; + private final TSocket socket; + public FramedConnWrapper(String host, int port,int timeout) { + socket = new TSocket(host,port,timeout); + transport = new TFramedTransport(socket); + proto = new TBinaryProtocol(transport); + } + public void open() throws TTransportException { + transport.open(); + } + public void close() { + transport.close(); + socket.close(); + } + public Cassandra.Client getClient() { + Cassandra.Client client = new Cassandra.Client(proto); + return client; + } +} Index: cassandra-handler/build.xml =================================================================== --- cassandra-handler/build.xml (revision 0) +++ cassandra-handler/build.xml (revision 0) @@ -0,0 +1,95 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Index: ql/ivy.xml =================================================================== --- ql/ivy.xml (revision 1078995) +++ ql/ivy.xml (working copy) @@ -28,6 +28,11 @@ + + + + +