Index: ant/build.xml =================================================================== --- ant/build.xml (revision 1182878) +++ ant/build.xml (working copy) @@ -39,7 +39,8 @@ includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="true"> Index: shims/build.xml =================================================================== --- shims/build.xml (revision 1182878) +++ shims/build.xml (working copy) @@ -51,7 +51,8 @@ excludes="**/Proxy*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> @@ -74,7 +75,8 @@ includes="**/Proxy*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> @@ -96,7 +98,8 @@ optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> @@ -109,7 +112,8 @@ optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> Index: build.xml =================================================================== --- build.xml (revision 1182878) +++ build.xml (working copy) @@ -107,7 +107,7 @@ - + @@ -129,7 +129,7 @@ - + @@ -140,7 +140,7 @@ - + @@ -461,6 +461,7 @@ + @@ -791,6 +792,8 @@ todir="${mvn.jar.dir}" /> + + + + + Index: jdbc/build.xml =================================================================== --- jdbc/build.xml (revision 1182878) +++ jdbc/build.xml (working copy) @@ -49,7 +49,7 @@ destdir="${build.classes}" debug="${javac.debug}" deprecation="${javac.deprecation}" - > + includeantruntime="false"> Index: metastore/build.xml =================================================================== --- metastore/build.xml (revision 1182878) +++ metastore/build.xml (working copy) @@ -60,7 +60,7 @@ destdir="${build.classes}" debug="${javac.debug}" deprecation="${javac.deprecation}" - > + includeantruntime="false"> Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1182878) +++ ivy/libraries.properties (working copy) @@ -26,6 +26,7 @@ datanucleus-core.version=2.0.3 datanucleus-enhancer.version=2.0.3 datanucleus-rdbms.version=2.0.3 +cassandra.version=0.8.7 checkstyle.version=5.0 commons-cli.version=1.2 commons-codec.version=1.3 Index: cassandra-handler/conf/cassandra.yaml =================================================================== --- cassandra-handler/conf/cassandra.yaml (revision 0) +++ cassandra-handler/conf/cassandra.yaml (revision 0) @@ -0,0 +1,410 @@ +# Cassandra storage config YAML + +# NOTE: +# See http://wiki.apache.org/cassandra/StorageConfiguration for +# full explanations of configuration directives +# /NOTE + +# The name of the cluster. This is mainly used to prevent machines in +# one logical cluster from joining another. +cluster_name: 'Test Cluster' + +# You should always specify InitialToken when setting up a production +# cluster for the first time, and often when adding capacity later. +# The principle is that each node should be given an equal slice of +# the token ring; see http://wiki.apache.org/cassandra/Operations +# for more details. +# +# If blank, Cassandra will request a token bisecting the range of +# the heaviest-loaded existing node. If there is no load information +# available, such as is the case with a new cluster, it will pick +# a random token, which will lead to hot spots. +initial_token: + +# Set to true to make new [non-seed] nodes automatically migrate data +# to themselves from the pre-existing nodes in the cluster. Defaults +# to false because you can only bootstrap N machines at a time from +# an existing cluster of N, so if you are bringing up a cluster of +# 10 machines with 3 seeds you would have to do it in stages. Leaving +# this off for the initial start simplifies that. +auto_bootstrap: false + +# See http://wiki.apache.org/cassandra/HintedHandoff +hinted_handoff_enabled: true +# this defines the maximum amount of time a dead host will have hints +# generated. After it has been dead this long, hints will be dropped. +max_hint_window_in_ms: 3600000 # one hour +# Sleep this long after delivering each row or row fragment +hinted_handoff_throttle_delay_in_ms: 50 + +# authentication backend, implementing IAuthenticator; used to identify users +authenticator: org.apache.cassandra.auth.AllowAllAuthenticator + +# authorization backend, implementing IAuthority; used to limit access/provide permissions +authority: org.apache.cassandra.auth.AllowAllAuthority + +# The partitioner is responsible for distributing rows (by key) across +# nodes in the cluster. Any IPartitioner may be used, including your +# own as long as it is on the classpath. Out of the box, Cassandra +# provides org.apache.cassandra.dht.RandomPartitioner +# org.apache.cassandra.dht.ByteOrderedPartitioner, +# org.apache.cassandra.dht.OrderPreservingPartitioner (deprecated), +# and org.apache.cassandra.dht.CollatingOrderPreservingPartitioner +# (deprecated). +# +# - RandomPartitioner distributes rows across the cluster evenly by md5. +# When in doubt, this is the best option. +# - ByteOrderedPartitioner orders rows lexically by key bytes. BOP allows +# scanning rows in key order, but the ordering can generate hot spots +# for sequential insertion workloads. +# - OrderPreservingPartitioner is an obsolete form of BOP, that stores +# - keys in a less-efficient format and only works with keys that are +# UTF8-encoded Strings. +# - CollatingOPP colates according to EN,US rules rather than lexical byte +# ordering. Use this as an example if you need custom collation. +# +# See http://wiki.apache.org/cassandra/Operations for more on +# partitioners and token selection. +partitioner: org.apache.cassandra.dht.RandomPartitioner + +# directories where Cassandra should store data on disk. +data_file_directories: + - /tmp/hive-cassandra-handler-test/data + +# commit log +commitlog_directory: /tmp/hive-cassandra-handler-test/commitlog + +# saved caches +saved_caches_directory: /tmp/hive-cassandra-handler-test/saved_caches + +# Size to allow commitlog to grow to before creating a new segment +commitlog_rotation_threshold_in_mb: 128 + +# commitlog_sync may be either "periodic" or "batch." +# When in batch mode, Cassandra won't ack writes until the commit log +# has been fsynced to disk. It will wait up to +# commitlog_sync_batch_window_in_ms milliseconds for other writes, before +# performing the sync. +# +# commitlog_sync: batch +# commitlog_sync_batch_window_in_ms: 50 +# +# the other option is "periodic" where writes may be acked immediately +# and the CommitLog is simply synced every commitlog_sync_period_in_ms +# milliseconds. +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 + +# any class that implements the SeedProvider interface and has a constructor that takes a Map of +# parameters will do. +seed_provider: + # Addresses of hosts that are deemed contact points. + # Cassandra nodes use this list of hosts to find each other and learn + # the topology of the ring. You must change this if you are running + # multiple nodes! + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + # seeds is actually a comma-delimited list of addresses. + # Ex: ",," + - seeds: "127.0.0.1" + +# emergency pressure valve: each time heap usage after a full (CMS) +# garbage collection is above this fraction of the max, Cassandra will +# flush the largest memtables. +# +# Set to 1.0 to disable. Setting this lower than +# CMSInitiatingOccupancyFraction is not likely to be useful. +# +# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY: +# it is most effective under light to moderate load, or read-heavy +# workloads; under truly massive write load, it will often be too +# little, too late. +flush_largest_memtables_at: 0.75 + +# emergency pressure valve #2: the first time heap usage after a full +# (CMS) garbage collection is above this fraction of the max, +# Cassandra will reduce cache maximum _capacity_ to the given fraction +# of the current _size_. Should usually be set substantially above +# flush_largest_memtables_at, since that will have less long-term +# impact on the system. +# +# Set to 1.0 to disable. Setting this lower than +# CMSInitiatingOccupancyFraction is not likely to be useful. +reduce_cache_sizes_at: 0.85 +reduce_cache_capacity_to: 0.6 + +# For workloads with more data than can fit in memory, Cassandra's +# bottleneck will be reads that need to fetch data from +# disk. "concurrent_reads" should be set to (16 * number_of_drives) in +# order to allow the operations to enqueue low enough in the stack +# that the OS and drives can reorder them. +# +# On the other hand, since writes are almost never IO bound, the ideal +# number of "concurrent_writes" is dependent on the number of cores in +# your system; (8 * number_of_cores) is a good rule of thumb. +concurrent_reads: 32 +concurrent_writes: 32 + +# Total memory to use for memtables. Cassandra will flush the largest +# memtable when this much memory is used. Prefer using this to +# the older, per-ColumnFamily memtable flush thresholds. +# If omitted, Cassandra will set it to 1/3 of the heap. +# If set to 0, only the old flush thresholds are used. +# memtable_total_space_in_mb: 2048 + +# This sets the amount of memtable flush writer threads. These will +# be blocked by disk io, and each one will hold a memtable in memory +# while blocked. If you have a large heap and many data directories, +# you can increase this value for better flush performance. +# By default this will be set to the amount of data directories defined. +#memtable_flush_writers: 1 + +# the number of full memtables to allow pending flush, that is, +# waiting for a writer thread. At a minimum, this should be set to +# the maximum number of secondary indexes created on a single CF. +memtable_flush_queue_size: 4 + +# Buffer size to use when performing contiguous column slices. +# Increase this to the size of the column slices you typically perform +sliced_buffer_size_in_kb: 64 + +# TCP port, for commands and data +storage_port: 7000 + +# Address to bind to and tell other Cassandra nodes to connect to. You +# _must_ change this if you want multiple nodes to be able to +# communicate! +# +# Leaving it blank leaves it up to InetAddress.getLocalHost(). This +# will always do the Right Thing *if* the node is properly configured +# (hostname, name resolution, etc), and the Right Thing is to use the +# address associated with the hostname (it might not be). +# +# Setting this to 0.0.0.0 is always wrong. +listen_address: localhost + +# The address to bind the Thrift RPC service to -- clients connect +# here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if +# you want Thrift to listen on all interfaces. +# +# Leaving this blank has the same effect it does for ListenAddress, +# (i.e. it will be based on the configured hostname of the node). +rpc_address: localhost +# port for Thrift to listen for clients on +rpc_port: 9170 + +# enable or disable keepalive on rpc connections +rpc_keepalive: true + +# Cassandra provides you with a variety of options for RPC Server +# sync -> Creates one thread per connection but with a configurable number of +# threads. This can be expensive in memory used for thread stack for +# a large enough number of clients. (Hence, connection pooling is +# very, very strongly recommended.) +# +# async -> Nonblocking server implementation with one thread to serve +# rpc connections. This is not recommended for high throughput use +# cases. +# +# hsha -> half sync and half async implementation with configurable number +# of worker threads (For managing connections). IO Management is +# done by a set of threads currently equal to the number of +# processors in the system. The number of threads in the threadpool +# is configured via rpc_min_threads and rpc_max_threads. (Connection +# pooling is strongly recommended in this case too.) + +rpc_server_type: sync + +# Uncomment rpc_min|max|thread to set request pool size. +# You would primarily set max for the sync server to safeguard against +# misbehaved clients; if you do hit the max, Cassandra will block until one +# disconnects before accepting more. The defaults are min of 16 and max +# unlimited. +# +# For the Hsha server, you would set the max so that a fair amount of resources +# are provided to the other working threads on the server. +# +# This configuration is not used for the async server. +# +# rpc_min_threads: 16 +# rpc_max_threads: 2048 + +# uncomment to set socket buffer sizes on rpc connections +# rpc_send_buff_size_in_bytes: +# rpc_recv_buff_size_in_bytes: + +# Frame size for thrift (maximum field length). +# 0 disables TFramedTransport in favor of TSocket. This option +# is deprecated; we strongly recommend using Framed mode. +thrift_framed_transport_size_in_mb: 15 + +# The max length of a thrift message, including all fields and +# internal thrift overhead. +thrift_max_message_length_in_mb: 16 + +# Set to true to have Cassandra create a hard link to each sstable +# flushed or streamed locally in a backups/ subdirectory of the +# Keyspace data. Removing these links is the operator's +# responsibility. +incremental_backups: false + +# Whether or not to take a snapshot before each compaction. Be +# careful using this option, since Cassandra won't clean up the +# snapshots for you. Mostly useful if you're paranoid when there +# is a data format change. +snapshot_before_compaction: false + +# change this to increase the compaction thread's priority. In java, 1 is the +# lowest priority and that is our default. The highest allowed is 5. +# compaction_thread_priority: 1 + +# Add column indexes to a row after its contents reach this size. +# Increase if your column values are large, or if you have a very large +# number of columns. The competing causes are, Cassandra has to +# deserialize this much of the row to read a single column, so you want +# it to be small - at least if you do many partial-row reads - but all +# the index data is read for each access, so you don't want to generate +# that wastefully either. +column_index_size_in_kb: 64 + +# Size limit for rows being compacted in memory. Larger rows will spill +# over to disk and use a slower two-pass compaction process. A message +# will be logged specifying the row key. +in_memory_compaction_limit_in_mb: 64 + +# Number of simultaneous compactions to allow, NOT including +# validation "compactions" for anti-entropy repair. This defaults to +# the number of cores. This can help preserve read performance in a +# mixed read/write workload, by mitigating the tendency of small +# sstables to accumulate during a single long running compactions. The +# default is usually fine and if you experience problems with +# compaction running too slowly or too fast, you should look at +# compaction_throughput_mb_per_sec first. +# +# Uncomment to make compaction mono-threaded. +#concurrent_compactors: 1 + +# Throttles compaction to the given total throughput across the entire +# system. The faster you insert data, the faster you need to compact in +# order to keep the sstable count down, but in general, setting this to +# 16 to 32 times the rate you are inserting data is more than sufficient. +# Setting this to 0 disables throttling. Note that this account for all types +# of compaction, including validation compaction. +compaction_throughput_mb_per_sec: 16 + +# Track cached row keys during compaction, and re-cache their new +# positions in the compacted sstable. Disable if you use really large +# key caches. +compaction_preheat_key_cache: true + +# Time to wait for a reply from other nodes before failing the command +rpc_timeout_in_ms: 10000 + +# phi value that must be reached for a host to be marked down. +# most users should never need to adjust this. +# phi_convict_threshold: 8 + +# endpoint_snitch -- Set this to a class that implements +# IEndpointSnitch, which will let Cassandra know enough +# about your network topology to route requests efficiently. +# Out of the box, Cassandra provides +# - org.apache.cassandra.locator.SimpleSnitch: +# Treats Strategy order as proximity. This improves cache locality +# when disabling read repair, which can further improve throughput. +# - org.apache.cassandra.locator.RackInferringSnitch: +# Proximity is determined by rack and data center, which are +# assumed to correspond to the 3rd and 2nd octet of each node's +# IP address, respectively +# org.apache.cassandra.locator.PropertyFileSnitch: +# - Proximity is determined by rack and data center, which are +# explicitly configured in cassandra-topology.properties. +endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch + +# dynamic_snitch -- This boolean controls whether the above snitch is +# wrapped with a dynamic snitch, which will monitor read latencies +# and avoid reading from hosts that have slowed (due to compaction, +# for instance) +dynamic_snitch: true +# controls how often to perform the more expensive part of host score +# calculation +dynamic_snitch_update_interval_in_ms: 100 +# controls how often to reset all host scores, allowing a bad host to +# possibly recover +dynamic_snitch_reset_interval_in_ms: 600000 +# if set greater than zero and read_repair_chance is < 1.0, this will allow +# 'pinning' of replicas to hosts in order to increase cache capacity. +# The badness threshold will control how much worse the pinned host has to be +# before the dynamic snitch will prefer other replicas over it. This is +# expressed as a double which represents a percentage. Thus, a value of +# 0.2 means Cassandra would continue to prefer the static snitch values +# until the pinned host was 20% worse than the fastest. +dynamic_snitch_badness_threshold: 0.0 + +# request_scheduler -- Set this to a class that implements +# RequestScheduler, which will schedule incoming client requests +# according to the specific policy. This is useful for multi-tenancy +# with a single Cassandra cluster. +# NOTE: This is specifically for requests from the client and does +# not affect inter node communication. +# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place +# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of +# client requests to a node with a separate queue for each +# request_scheduler_id. The scheduler is further customized by +# request_scheduler_options as described below. +request_scheduler: org.apache.cassandra.scheduler.NoScheduler + +# Scheduler Options vary based on the type of scheduler +# NoScheduler - Has no options +# RoundRobin +# - throttle_limit -- The throttle_limit is the number of in-flight +# requests per client. Requests beyond +# that limit are queued up until +# running requests can complete. +# The value of 80 here is twice the number of +# concurrent_reads + concurrent_writes. +# - default_weight -- default_weight is optional and allows for +# overriding the default which is 1. +# - weights -- Weights are optional and will default to 1 or the +# overridden default_weight. The weight translates into how +# many requests are handled during each turn of the +# RoundRobin, based on the scheduler id. +# +# request_scheduler_options: +# throttle_limit: 80 +# default_weight: 5 +# weights: +# Keyspace1: 1 +# Keyspace2: 5 + +# request_scheduler_id -- An identifer based on which to perform +# the request scheduling. Currently the only valid option is keyspace. +# request_scheduler_id: keyspace + +# index_interval controls the sampling of entries from the primrary +# row index in terms of space versus time. The larger the interval, +# the smaller and less effective the sampling will be. In technicial +# terms, the interval coresponds to the number of index entries that +# are skipped between taking each sample. All the sampled entries +# must fit in memory. Generally, a value between 128 and 512 here +# coupled with a large key cache size on CFs results in the best trade +# offs. This value is not often changed, however if you have many +# very small rows (many to an OS page), then increasing this will +# often lower memory usage without a impact on performance. +index_interval: 128 + +# Enable or disable inter-node encryption +# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that +# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher +# suite for authentication, key exchange and encryption of the actual data transfers. +# NOTE: No custom encryption options are enabled at the moment +# The available internode options are : all, none +# +# The passwords used in these options must match the passwords used when generating +# the keystore and truststore. For instructions on generating these files, see: +# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore +encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra Index: cassandra-handler/conf/cassandra-topology.properties =================================================================== --- cassandra-handler/conf/cassandra-topology.properties (revision 0) +++ cassandra-handler/conf/cassandra-topology.properties (revision 0) @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Cassandra Node IP=Data Center:Rack +192.168.1.100=DC1:RAC1 +192.168.2.200=DC2:RAC2 + +10.0.0.10=DC1:RAC1 +10.0.0.11=DC1:RAC1 +10.0.0.12=DC1:RAC2 + +10.20.114.10=DC2:RAC1 +10.20.114.11=DC2:RAC1 + +10.21.119.13=DC3:RAC1 +10.21.119.10=DC3:RAC1 + +10.0.0.13=DC1:RAC2 +10.21.119.14=DC3:RAC2 +10.20.114.15=DC2:RAC2 + +# default for unknown nodes +default=DC1:r1 + +# Native IPv6 is supported, however you must escape the colon in the IPv6 Address +# Also be sure to comment out JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true" +# in cassandra-env.sh +fe80\:0\:0\:0\:202\:b3ff\:fe1e\:8329=DC1:RAC3 Property changes on: cassandra-handler/conf/cassandra-topology.properties ___________________________________________________________________ Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/conf/README.txt =================================================================== --- cassandra-handler/conf/README.txt (revision 0) +++ cassandra-handler/conf/README.txt (revision 0) @@ -0,0 +1,13 @@ +Required configuration files +============================ + +cassandra.yaml: main Cassandra configuration file +log4j-server.proprties: log4j configuration file for Cassandra server + + +Optional configuration files +============================ + +access.properties: used for authorization +passwd.properties: used for authentication +cassandra-topology.properties: used by PropertyFileSnitch Property changes on: cassandra-handler/conf/README.txt ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/conf/access.properties =================================================================== --- cassandra-handler/conf/access.properties (revision 0) +++ cassandra-handler/conf/access.properties (revision 0) @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a sample access file for SimpleAuthority. The format of this file +# is KEYSPACE[.COLUMNFAMILY].PERMISSION=USERS, where: +# +# * KEYSPACE is the keyspace name. +# * COLUMNFAMILY is the column family name. +# * PERMISSION is one of or for read-only or read-write respectively. +# * USERS is a comma delimited list of users from passwd.properties. +# +# See below for example entries. + +# NOTE: This file contains potentially sensitive information, please keep +# this in mind when setting its mode and ownership. + +# The magical '' property lists users who can modify the +# list of keyspaces: all users will be able to view the list of keyspaces. +=jsmith + +# Access to Keyspace1 (add/remove column families, etc). +Keyspace1.=jsmith,Elvis Presley +Keyspace1.=dilbert + +# Access to Standard1 (keyspace Keyspace1) +Keyspace1.Standard1.=jsmith,Elvis Presley,dilbert Property changes on: cassandra-handler/conf/access.properties ___________________________________________________________________ Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/conf/passwd.properties =================================================================== --- cassandra-handler/conf/passwd.properties (revision 0) +++ cassandra-handler/conf/passwd.properties (revision 0) @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This is a sample password file for SimpleAuthenticator. The format of +# this file is username=password. If -Dpasswd.mode=MD5 then the password +# is represented as an md5 digest, otherwise it is cleartext (keep this +# in mind when setting file mode and ownership). +jsmith=havebadpass +Elvis\ Presley=graceland4evar +dilbert=nomoovertime Property changes on: cassandra-handler/conf/passwd.properties ___________________________________________________________________ Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/conf/schema-sample.txt =================================================================== --- cassandra-handler/conf/schema-sample.txt (revision 0) +++ cassandra-handler/conf/schema-sample.txt (revision 0) @@ -0,0 +1,70 @@ +/*This file contains an example Keyspace that can be created using the +cassandra-cli command line interface as follows. + +bin/cassandra-cli -host localhost --file conf/schema-sample.txt + +The cassandra-cli includes online help that explains the statements below. You can +accessed the help without connecting to a running cassandra instance by starting the +client and typing "help;" +*/ + +create keyspace Keyspace1 + with strategy_options=[{replication_factor:1}] + and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'; + +use Keyspace1; + +create column family Standard1 + with comparator = BytesType + and keys_cached = 10000 + and rows_cached = 1000 + and row_cache_save_period = 0 + and key_cache_save_period = 3600 + and memtable_flush_after = 59 + and memtable_throughput = 255 + and memtable_operations = 0.29; + +create column family Standard2 + with comparator = UTF8Type + and read_repair_chance = 0.1 + and keys_cached = 100 + and gc_grace = 0 + and min_compaction_threshold = 5 + and max_compaction_threshold = 31; + +create column family StandardByUUID1 + with comparator = TimeUUIDType; + +create column family Super1 + with column_type = Super + and comparator = BytesType + and subcomparator = BytesType; + +create column family Super2 + with column_type = Super + and subcomparator = UTF8Type + and rows_cached = 10000 + and keys_cached = 50 + and comment = 'A column family with supercolumns, whose column and subcolumn names are UTF8 strings'; + +create column family Super3 + with column_type = Super + and comparator = LongType + and comment = 'A column family with supercolumns, whose column names are Longs (8 bytes)'; + +create column family Indexed1 + with comparator = UTF8Type + and default_validation_class = LongType + and column_metadata = [{ + column_name : birthdate, + validation_class : LongType, + index_name : birthdate_idx, + index_type : 0} + ]; + +create column family Counter1 + with default_validation_class = CounterColumnType; + +create column family SuperCounter1 + with column_type = Super + and default_validation_class = CounterColumnType; Property changes on: cassandra-handler/conf/schema-sample.txt ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/conf/cassandra-env.sh =================================================================== --- cassandra-handler/conf/cassandra-env.sh (revision 0) +++ cassandra-handler/conf/cassandra-env.sh (revision 0) @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +calculate_heap_sizes() +{ + case "`uname`" in + Linux) + system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'` + system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo` + break + ;; + FreeBSD) + system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'` + system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024` + system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'` + break + ;; + SunOS) + system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'` + system_cpu_cores=`psrinfo | wc -l` + break + ;; + *) + # assume reasonable defaults for e.g. a modern desktop or + # cheap server + system_memory_in_mb="2048" + system_cpu_cores="2" + ;; + esac + max_heap_size_in_mb=`expr $system_memory_in_mb / 2` + MAX_HEAP_SIZE="${max_heap_size_in_mb}M" + + # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size) + max_sensible_yg_per_core_in_mb="100" + max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores` + + desired_yg_in_mb=`expr $max_heap_size_in_mb / 4` + + if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ] + then + HEAP_NEWSIZE="${max_sensible_yg_in_mb}M" + else + HEAP_NEWSIZE="${desired_yg_in_mb}M" + fi +} + +# Override these to set the amount of memory to allocate to the JVM at +# start-up. For production use you almost certainly want to adjust +# this for your environment. MAX_HEAP_SIZE is the total amount of +# memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size +# of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should +# be either set or not (if you set one, set the other). +# +# The main trade-off for the young generation is that the larger it +# is, the longer GC pause times will be. The shorter it is, the more +# expensive GC will be (usually). +# +# The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause +# times. If in doubt, and if you do not particularly want to tweak, go with +# 100 MB per physical CPU core. + +#MAX_HEAP_SIZE="4G" +#HEAP_NEWSIZE="800M" + +if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then + calculate_heap_sizes +else + if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then + echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)" + exit 1 + fi +fi + +# Specifies the default port over which Cassandra will be available for +# JMX connections. +JMX_PORT="7199" + + +# Here we create the arguments that will get passed to the jvm when +# starting cassandra. + +# enable assertions. disabling this in production will give a modest +# performance benefit (around 5%). +JVM_OPTS="$JVM_OPTS -ea" + +# add the jamm javaagent +check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'` +if [ "$check_openjdk" != "OpenJDK" ] +then + JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar" +fi + +# enable thread priorities, primarily so we can give periodic tasks +# a lower priority to avoid interfering with client workload +JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" +# allows lowering thread priority without being root. see +# http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html +JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42" + +# min and max heap sizes should be set to the same value to avoid +# stop-the-world GC pauses during resize, and so that we can lock the +# heap in memory on startup to prevent any of it from being swapped +# out. +JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}" +JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}" +JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}" +JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" + +# set jvm HeapDumpPath with CASSANDRA_HEAPDUMP_DIR +if [ "x$CASSANDRA_HEAPDUMP_DIR" != "x" ]; then + JVM_OPTS="$JVM_OPTS -XX:HeapDumpPath=$CASSANDRA_HEAPDUMP_DIR/cassandra-`date +%s`-pid$$.hprof" +fi + +if [ "`uname`" = "Linux" ] ; then + # reduce the per-thread stack size to minimize the impact of Thrift + # thread-per-client. (Best practice is for client connections to + # be pooled anyway.) Only do so on Linux where it is known to be + # supported. + JVM_OPTS="$JVM_OPTS -Xss128k" +fi + +# GC tuning options +JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" +JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" +JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" +JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" +JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1" +JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75" +JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly" + +# GC logging options -- uncomment to enable +# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails" +# JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps" +# JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram" +# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution" +# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" +# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log" + +# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 +# JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414" + +# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See +# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: +# comment out this entry to enable IPv6 support). +JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true" + +# jmx: metrics and administration interface +# +# add this if you're having trouble connecting: +# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=" +# +# see +# http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole +# for more on configuring JMX through firewalls, etc. (Short version: +# get it working with no firewall first.) +JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" +JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" +JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" Property changes on: cassandra-handler/conf/cassandra-env.sh ___________________________________________________________________ Added: svn:executable + * Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/ivy.xml =================================================================== --- cassandra-handler/ivy.xml (revision 0) +++ cassandra-handler/ivy.xml (revision 0) @@ -0,0 +1,31 @@ + + + + + + + Hive is a data warehouse infrastructure built on top of Hadoop see + http://wiki.apache.org/hadoop/Hive + + + + + + + + Property changes on: cassandra-handler/ivy.xml ___________________________________________________________________ Added: svn:mime-type + text/xml Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/lib/readme.txt =================================================================== --- cassandra-handler/lib/readme.txt (revision 0) +++ cassandra-handler/lib/readme.txt (revision 0) @@ -0,0 +1 @@ +Required jars are gathered by ../ql/ivy.xml Property changes on: cassandra-handler/lib/readme.txt ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/test/results/cassandra_queries =================================================================== --- cassandra-handler/src/test/results/cassandra_queries (revision 0) +++ cassandra-handler/src/test/results/cassandra_queries (revision 0) @@ -0,0 +1,365 @@ +PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: describe cassandra_keyspace1_standard1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: describe cassandra_keyspace1_standard1 +POSTHOOK: type: DESCTABLE +key int from deserializer +value string from deserializer +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 7) 0)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Filter Operator + predicate: + expr: ((key % 7) = 0) + type: boolean + Filter Operator + predicate: + expr: ((key % 7) = 0) + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat + output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat + serde: org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe + name: default.cassandra_keyspace1_standard1 + + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@cassandra_keyspace1_standard1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@cassandra_keyspace1_standard1 +PREHOOK: query: EXPLAIN +select * from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select * from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select * from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_494_7286511593311887975/-mr-10000 +POSTHOOK: query: select * from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_494_7286511593311887975/-mr-10000 +252 val_252 +126 val_126 +119 val_119 +196 val_196 +266 val_266 +392 val_392 +413 val_413 +983 val_98 +224 val_224 +427 val_427 +357 val_35 +203 val_203 +483 val_483 +469 val_469 +779 val_77 +79 val_0 +289 val_28 +399 val_399 +490 val_490 +364 val_364 +273 val_273 +462 val_462 +315 val_315 +238 val_238 +308 val_308 +322 val_322 +189 val_189 +429 val_42 +133 val_133 +217 val_217 +448 val_448 +105 val_105 +336 val_336 +846 val_84 +280 val_280 +287 val_287 +406 val_406 +497 val_497 +378 val_378 +708 val_70 +455 val_455 +175 val_175 +168 val_168 +PREHOOK: query: EXPLAIN +select value from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select value from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + cassandra_keyspace1_standard1 + TableScan + alias: cassandra_keyspace1_standard1 + Select Operator + expressions: + expr: value + type: string + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select value from cassandra_keyspace1_standard1 +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_885_3161823916748354795/-mr-10000 +POSTHOOK: query: select value from cassandra_keyspace1_standard1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_885_3161823916748354795/-mr-10000 +val_252 +val_126 +val_119 +val_196 +val_266 +val_392 +val_413 +val_98 +val_224 +val_427 +val_35 +val_203 +val_483 +val_469 +val_77 +val_0 +val_28 +val_399 +val_490 +val_364 +val_273 +val_462 +val_315 +val_238 +val_308 +val_322 +val_189 +val_42 +val_133 +val_217 +val_448 +val_105 +val_336 +val_84 +val_280 +val_287 +val_406 +val_497 +val_378 +val_70 +val_455 +val_175 +val_168 +PREHOOK: query: EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) a) (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 0 + value expressions: + expr: key + type: int + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 1 + value expressions: + expr: value + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col1, _col5 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@cassandra_keyspace1_standard1 +PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-37_515_951258717592195422/-mr-10000 +POSTHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cassandra_keyspace1_standard1 +POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-37_515_951258717592195422/-mr-10000 +79 val_0 val_0 +105 val_105 val_105 +119 val_119 val_119 +126 val_126 val_126 +133 val_133 val_133 +168 val_168 val_168 +175 val_175 val_175 +189 val_189 val_189 +196 val_196 val_196 +203 val_203 val_203 +217 val_217 val_217 +224 val_224 val_224 +238 val_238 val_238 +252 val_252 val_252 +266 val_266 val_266 +273 val_273 val_273 +280 val_280 val_280 +287 val_287 val_287 +289 val_28 val_28 +308 val_308 val_308 +315 val_315 val_315 +322 val_322 val_322 +336 val_336 val_336 +357 val_35 val_35 +364 val_364 val_364 +378 val_378 val_378 +392 val_392 val_392 +399 val_399 val_399 +406 val_406 val_406 +413 val_413 val_413 +427 val_427 val_427 +429 val_42 val_42 +448 val_448 val_448 +455 val_455 val_455 +462 val_462 val_462 +469 val_469 val_469 +483 val_483 val_483 +490 val_490 val_490 +497 val_497 val_497 +708 val_70 val_70 +779 val_77 val_77 +846 val_84 val_84 +983 val_98 val_98 Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java =================================================================== --- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) +++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0) @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.contrib.utils.service; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; + +public class CassandraServiceDataCleaner { + + /** + * Creates all data dir if they don't exist and cleans them + * + * @throws IOException + */ + public void prepare() throws IOException { + makeDirsIfNotExist(); + cleanupDataDirectories(); + } + + /** + * Deletes all data from cassandra data directories, including the commit log. + * + * @throws IOException + * in case of permissions error etc. + */ + public void cleanupDataDirectories() throws IOException { + for (String s : getDataDirs()) { + cleanDir(s); + } + } + + /** + * Creates the data diurectories, if they didn't exist. + * + * @throws IOException + * if directories cannot be created (permissions etc). + */ + public void makeDirsIfNotExist() throws IOException { + for (String s : getDataDirs()) { + mkdir(s); + } + } + + /** + * Collects all data dirs and returns a set of String paths on the file system. + * + * @return + */ + private Set getDataDirs() { + Set dirs = new HashSet(); + for (String s : DatabaseDescriptor.getAllDataFileLocations()) { + dirs.add(s); + } + dirs.add(DatabaseDescriptor.getCommitLogLocation()); + return dirs; + } + + /** + * Creates a directory + * + * @param dir + * @throws IOException + */ + private void mkdir(String dir) throws IOException { + FileUtils.createDirectory(dir); + } + + /** + * Removes all directory content from file the system + * + * @param dir + * @throws IOException + */ + private void cleanDir(String dir) throws IOException { + File dirFile = new File(dir); + if (dirFile.exists() && dirFile.isDirectory()) { + FileUtils.delete(dirFile.listFiles()); + } + } +} Property changes on: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0) @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra; + +import java.io.IOException; + +import junit.extensions.TestSetup; +import junit.framework.Test; + +import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.CfDef; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SchemaDisagreementException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +public class CassandraTestSetup extends TestSetup { + + static final Log LOG = LogFactory.getLog(CassandraTestSetup.class); + private EmbeddedCassandraService cassandra; + + public CassandraTestSetup(Test test) { + super(test); + } + + @SuppressWarnings("deprecation") + void preTest(HiveConf conf) throws IOException, TTransportException, TException { + if (cassandra == null) { + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + } + + FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1", 9170, 5000); + wrap.open(); + KsDef ks = new KsDef(); + ks.setName("Keyspace1"); + ks.setReplication_factor(1); + ks.setStrategy_class("org.apache.cassandra.locator.SimpleStrategy"); + CfDef cf = new CfDef(); + cf.setName("Standard1"); + cf.setKeyspace("Keyspace1"); + ks.addToCf_defs(cf); + Cassandra.Client client = wrap.getClient(); + try { + try { + KsDef exists = client.describe_keyspace("Keyspace1"); + } catch (NotFoundException ex) { + client.system_add_keyspace(ks); + System.out.println("ks added"); + try { + Thread.sleep(2000); + } catch (Exception ex2) { + } + } + } catch (SchemaDisagreementException e) { + throw new RuntimeException(e); + } catch (InvalidRequestException e) { + throw new RuntimeException(e); + } + wrap.close(); + + String auxJars = conf.getAuxJars(); + auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://" + + new JobConf(conf, Cassandra.Client.class).getJar(); + auxJars += ",file://" + new JobConf(conf, StandardColumnSerDe.class).getJar(); + auxJars += ",file://" + new JobConf(conf, org.apache.thrift.transport.TSocket.class).getJar(); + // auxJars += ",file://" + new JobConf(conf, + // com.google.common.collect.AbstractIterator.class).getJar(); + auxJars += ",file://" + new JobConf(conf, org.apache.commons.lang.ArrayUtils.class).getJar(); + conf.setAuxJars(auxJars); + + } + + @Override + protected void tearDown() throws Exception { + // do we need this? + CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner(); + cleaner.prepare(); + } + +} Property changes on: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java =================================================================== --- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) +++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra; + +import org.apache.hadoop.hive.ql.QTestUtil; + +/** + * HBaseQTestUtil initializes HBase-specific test fixtures. + */ +public class CassandraQTestUtil extends QTestUtil { + public CassandraQTestUtil( + String outDir, String logDir, boolean miniMr, CassandraTestSetup setup) + throws Exception { + + super(outDir, logDir, miniMr, null); + setup.preTest(conf); + super.init(); + } + + @Override + public void init() throws Exception { + // defer + } +} Property changes on: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/test/queries/cassandra_queries.q =================================================================== --- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) +++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0) @@ -0,0 +1,29 @@ +SET hive.support.concurrency=false; +add file conf/cassandra.yaml; + +CREATE EXTERNAL TABLE IF NOT EXISTS +cassandra_keyspace1_standard1(key int, value string) +STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' +WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" ) +TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1"); + +describe cassandra_keyspace1_standard1; + +EXPLAIN +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key; + +FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key; + +EXPLAIN +select * from cassandra_keyspace1_standard1 ORDER BY key; +select * from cassandra_keyspace1_standard1 ORDER BY key; + +EXPLAIN +select value from cassandra_keyspace1_standard1 ORDER BY VALUE; + +select value from cassandra_keyspace1_standard1 ORDER BY VALUE; + +EXPLAIN +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key; + +select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key; Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm =================================================================== --- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) +++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0) @@ -0,0 +1,126 @@ +package org.apache.hadoop.hive.cli; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.hive.cassandra.CassandraQTestUtil; +import org.apache.hadoop.hive.cassandra.CassandraTestSetup; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.antlr.runtime.*; +import org.antlr.runtime.tree.*; + +public class $className extends TestCase { + + private CassandraQTestUtil qt; + private CassandraTestSetup setup; + + public $className(String name, CassandraTestSetup setup) { + super(name); + qt = null; + this.setup = setup; + } + + @Override + protected void setUp() { + try { + boolean miniMR = false; + if ("$clusterMode".equals("miniMR")) { + miniMR = true; + } + + qt = new CassandraQTestUtil( + "$resultsDir.getCanonicalPath()", + "$logDir.getCanonicalPath()", miniMR, setup); + +#foreach ($qf in $qfiles) + qt.addFile("$qf.getCanonicalPath()"); +#end + } catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in setup"); + } + } + + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + public static Test suite() { + TestSuite suite = new TestSuite(); + CassandraTestSetup setup = new CassandraTestSetup(suite); +#foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + suite.addTest(new $className("testCliDriver_$tname", setup)); +#end + return setup; + } + + #foreach ($qf in $qfiles) + #set ($fname = $qf.getName()) + #set ($eidx = $fname.length() - 2) + #set ($tname = $fname.substring(0, $eidx)) + public void testCliDriver_$tname() throws Exception { + try { + System.out.println("Begin query: " + "$fname"); + qt.cliInit("$fname"); + int ecode = qt.executeClient("$fname"); + if (ecode != 0) { + fail("Client Execution failed with error code = " + ecode); + } + if (SessionState.get() != null) { + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + + if (jobInfoMap.size() != 0) { + String cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) { + fail("Wrong return code in hive history"); + } + } + } + + ecode = qt.checkCliDriverResults("$fname"); + if (ecode != 0) { + fail("Client execution results failed with error code = " + ecode); + } + } catch (Throwable e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception"); + } + + System.out.println("Done query: " + "$fname"); + assertTrue("Test passed", true); + } + +#end +} + Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0) @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Collection; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.MarshalException; +import org.apache.hadoop.io.Writable; + +public class HiveIColumn implements IColumn, Writable { + + private byte[] name; + private byte[] value; + private long timestamp; + + public HiveIColumn() { + + } + + @Override + public ByteBuffer name() { + return ByteBuffer.wrap(name); + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public ByteBuffer value() { + return ByteBuffer.wrap(value); + } + + @Override + public void readFields(DataInput in) throws IOException { + name = new byte[in.readInt()]; + in.readFully(name); + + value = new byte[in.readInt()]; + in.readFully(value); + + timestamp = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(name.length); + out.write(name); + + out.writeInt(value.length); + out.write(value); + + out.writeLong(timestamp); + } + + // bean patterns + + public byte[] getName() { + return name; + } + + public void setName(byte[] name) { + this.name = name; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("HiveIColumn["); + sb.append("name " + new String(this.name) + " "); + sb.append("value " + new String(this.value) + " "); + sb.append("timestamp " + this.timestamp + " "); + return sb.toString(); + } + + // not needed for current integration + + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + @Override + public void addColumn(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn diff(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public int getLocalDeletionTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMarkedForDeleteAt() { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(AbstractType arg0) { + throw new UnsupportedOperationException(); + } + + + @Override + public Collection getSubColumns() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isMarkedForDelete() { + throw new UnsupportedOperationException(); + } + + @Override + public long mostRecentLiveChangeAt() { + throw new UnsupportedOperationException(); + } + + @Override + public int serializedSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDigest(MessageDigest arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn getSubColumn(ByteBuffer arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLive() { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn reconcile(IColumn arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public IColumn localCopy(ColumnFamilyStore arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public int serializationFlags() { + throw new UnsupportedOperationException(); + } + + @Override + public void validateFields(CFMetaData arg0) throws MarshalException { + throw new UnsupportedOperationException(); + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java (revision 0) @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +@SuppressWarnings("deprecation") +public class HiveCassandraStandardColumnInputFormat extends + ColumnFamilyInputFormat implements InputFormat { + + static final Log LOG = LogFactory.getLog(HiveCassandraStandardColumnInputFormat.class); + + @Override + public RecordReader getRecordReader + (InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { + HiveCassandraStandardSplit cassandraSplit = (HiveCassandraStandardSplit) split; + List columns = StandardColumnSerDe + .parseColumnMapping(cassandraSplit.getColumnMapping()); + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); + if (columns.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } + org.apache.cassandra.hadoop.ColumnFamilySplit cfSplit = cassandraSplit.getSplit(); + Job job = new Job(jobConf); + + TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) { + @Override + public void progress() { + reporter.progress(); + } + }; + + SlicePredicate predicate = new SlicePredicate(); + SliceRange range = new SliceRange(); + range.setStart(new byte[0]); + range.setFinish(new byte[0]); + range.setReversed(false); + range.setCount(cassandraSplit.getSlicePredicateSize()); + predicate.setSlice_range(range); + + final org.apache.hadoop.mapreduce.RecordReader> recordReader = createRecordReader( + cfSplit, tac); + + try { + ConfigHelper.setInputColumnFamily(tac.getConfiguration(), + cassandraSplit.getKeyspace(), cassandraSplit.getColumnFamily()); + + ConfigHelper.setInputSlicePredicate(tac.getConfiguration(), predicate); + ConfigHelper.setRangeBatchSize(tac.getConfiguration(), cassandraSplit.getRangeBatchSize()); + ConfigHelper.setRpcPort(tac.getConfiguration(), cassandraSplit.getPort() + ""); + ConfigHelper.setInitialAddress(tac.getConfiguration(), cassandraSplit.getHost()); + ConfigHelper.setPartitioner(tac.getConfiguration(), cassandraSplit.getPartitioner()); + + recordReader.initialize(cfSplit, tac); + } catch (InterruptedException ie) { + throw new IOException(ie); + } catch (Exception ie) { + throw new IOException(ie); + } + return new RecordReader() { + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public Text createKey() { + return new Text(); + } + + @Override + public HiveCassandraStandardRowResult createValue() { + return new HiveCassandraStandardRowResult(); + } + + @Override + public long getPos() throws IOException { + return 0l; + } + + @Override + public float getProgress() throws IOException { + float progress = 0.0F; + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return progress; + } + + @Override + public boolean next(Text rowKey, HiveCassandraStandardRowResult value) throws IOException { + boolean next = false; + try { + next = recordReader.nextKeyValue(); + if (next) { + rowKey.set(ByteBufferUtil.getArray(recordReader.getCurrentKey())); + MapWritable theMap = new MapWritable(); + for (Map.Entry entry : recordReader.getCurrentValue().entrySet()) { + HiveIColumn hic = new HiveIColumn(); + hic.setName(ByteBufferUtil.getArray(entry.getValue().name())); + hic.setValue(ByteBufferUtil.getArray(entry.getValue().value())); + hic.setTimestamp(entry.getValue().timestamp()); + theMap.put(new BytesWritable(ByteBufferUtil.getArray(entry.getValue().name())), hic); + } + value.setKey(rowKey); + value.setValue(theMap); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return next; + } + }; + + } + + /** + * The Cassandra record Reader throws InteruptedException, + * we overlay here to throw IOException instead. + */ + @Override + public org.apache.hadoop.mapreduce.RecordReader> + createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException { + org.apache.hadoop.mapreduce.RecordReader> result = null; + try { + result = super.createRecordReader(inputSplit, taskAttemptContext); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (Exception ex) { + throw new IOException(ex); + } + return result; + } + + @Override + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + String ks = jobConf.get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + String cf = jobConf.get(StandardColumnSerDe.CASSANDRA_CF_NAME); + int slicePredicateSize = jobConf.getInt(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + 1000); + int sliceRangeSize = jobConf.getInt(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, 1000); + String cassandraColumnMapping = jobConf.get(StandardColumnSerDe.CASSANDRA_COL_MAPPING); + int rpcPort = jobConf.getInt(StandardColumnSerDe.CASSANDRA_PORT, 9160); + String host = jobConf.get(StandardColumnSerDe.CASSANDRA_HOST); + String partitioner = jobConf.get(StandardColumnSerDe.CASSANDRA_PARTITIONER); + + if (cassandraColumnMapping == null) { + throw new IOException("cassandra.columns.mapping required for Cassandra Table."); + } + + SliceRange range = new SliceRange(); + range.setStart(new byte[0]); + range.setFinish(new byte[0]); + range.setReversed(false); + range.setCount(slicePredicateSize); + SlicePredicate predicate = new SlicePredicate(); + predicate.setSlice_range(range); + + ConfigHelper.setRpcPort(jobConf, "" + rpcPort); + ConfigHelper.setInitialAddress(jobConf, host); + ConfigHelper.setPartitioner(jobConf, partitioner); + ConfigHelper.setInputSlicePredicate(jobConf, predicate); + ConfigHelper.setInputColumnFamily(jobConf, ks, cf); + ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize); + + Job job = new Job(jobConf); + JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID()); + + Path[] tablePaths = FileInputFormat.getInputPaths(jobContext); + List splits = getSplits(jobContext); + InputSplit[] results = new InputSplit[splits.size()]; + + for (int i = 0; i < splits.size(); ++i) { + HiveCassandraStandardSplit csplit = new HiveCassandraStandardSplit( + (ColumnFamilySplit) splits.get(i), cassandraColumnMapping, tablePaths[0]); + csplit.setKeyspace(ks); + csplit.setColumnFamily(cf); + csplit.setRangeBatchSize(sliceRangeSize); + csplit.setHost(host); + csplit.setPort(rpcPort); + csplit.setSlicePredicateSize(slicePredicateSize); + csplit.setPartitioner(partitioner); + csplit.setColumnMapping(cassandraColumnMapping); + results[i] = csplit; + } + return results; + } + +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java (revision 0) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * + * HiveCassandraStandardRowResult. Used as the value side of + * the InputFormat + * + */ +public class HiveCassandraStandardRowResult implements Writable { + + private Text key; + private MapWritable value; + + public HiveCassandraStandardRowResult() { + } + + @Override + public void readFields(DataInput in) throws IOException { + key = new Text(); + key.readFields(in); + value = new MapWritable(); + value.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + key.write(out); + value.write(out); + } + + public Text getKey() { + return key; + } + + public void setKey(Text key) { + this.key = key; + } + + public MapWritable getValue() { + return value; + } + + public void setValue(MapWritable value) { + this.value = value; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("RowResult key:" + key); + for (Map.Entry entry : value.entrySet()) { + sb.append("entry key:" + entry.getKey() + " "); + sb.append("entry value:" + entry.getValue() + " "); + } + return sb.toString(); + } + +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0) @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.input; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + +public class LazyCassandraRow extends LazyStruct { + private List cassandraColumns; + private HiveCassandraStandardRowResult rowResult; + private ArrayList cachedList; + + public LazyCassandraRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + public void init(HiveCassandraStandardRowResult crr, List cassandraColumns, + List cassandraColumnsBytes) { + this.rowResult = crr; + this.cassandraColumns = cassandraColumns; + setParsed(false); + } + + private void parse() { + if (getFields() == null) { + List fieldRefs = ((StructObjectInspector) getInspector()) + .getAllStructFieldRefs(); + setFields(new LazyObject[fieldRefs.size()]); + for (int i = 0; i < getFields().length; i++) { + String cassandraColumn = this.cassandraColumns.get(i); + if (cassandraColumn.endsWith(":")) { + // want all columns as a map + getFields()[i] = new LazyCassandraCellMap((LazyMapObjectInspector) + fieldRefs.get(i).getFieldObjectInspector()); + } else { + // otherwise only interested in a single column + getFields()[i] = LazyFactory.createLazyObject( + fieldRefs.get(i).getFieldObjectInspector()); + } + } + setFieldInited(new boolean[getFields().length]); + } + Arrays.fill(getFieldInited(), false); + setParsed(true); + } + + @Override + public Object getField(int fieldID) { + if (!getParsed()) { + parse(); + } + return uncheckedGetField(fieldID); + } + + private Object uncheckedGetField(int fieldID) { + if (!getFieldInited()[fieldID]) { + getFieldInited()[fieldID] = true; + ByteArrayRef ref = null; + String columnName = cassandraColumns.get(fieldID); + String cfName = columnName.split(":")[0]; + String colName = columnName.split(":")[1]; + if (columnName.equals(StandardColumnSerDe.CASSANDRA_KEY_COLUMN)) { + // user is asking for key column + ref = new ByteArrayRef(); + ref.setData(rowResult.getKey().getBytes()); + } else if (columnName.endsWith(":")) { + // user wants all columns as a map + // TODO this into a LazyCassandraCellMap + return null; + } else { + // user wants the value of a single column + Writable res = rowResult.getValue().get(new BytesWritable(colName.getBytes())); + HiveIColumn hiveIColumn = (HiveIColumn) res; + if (hiveIColumn != null) { + ref = new ByteArrayRef(); + ref.setData(hiveIColumn.value().array()); + } else { + return null; + } + } + if (ref != null) { + getFields()[fieldID].init(ref, 0, ref.getData().length); + } + } + return getFields()[fieldID].getObject(); + } + + /** + * Get the values of the fields as an ArrayList. + * + * @return The values of the fields as an ArrayList. + */ + @Override + public ArrayList getFieldsAsList() { + if (!getParsed()) { + parse(); + } + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i = 0; i < getFields().length; i++) { + cachedList.add(uncheckedGetField(i)); + } + return cachedList; + } + + @Override + public Object getObject() { + return this; + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java (revision 0) @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.input; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; + +@SuppressWarnings("deprecation") +public class HiveCassandraStandardSplit extends FileSplit implements InputSplit { + private final ColumnFamilySplit split; + private String columnMapping; + private String keyspace; + private String columnFamily; + private int rangeBatchSize; + private int slicePredicateSize; + // added for 7.0 + private String partitioner; + private int port; + private String host; + + public HiveCassandraStandardSplit() { + super((Path) null, 0, 0, (String[]) null); + columnMapping = ""; + split = new ColumnFamilySplit(null, null, null); + } + + public HiveCassandraStandardSplit(ColumnFamilySplit split, String columnsMapping, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.split = split; + columnMapping = columnsMapping; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + columnMapping = in.readUTF(); + keyspace = in.readUTF(); + columnFamily = in.readUTF(); + rangeBatchSize = in.readInt(); + slicePredicateSize = in.readInt(); + partitioner = in.readUTF(); + port = in.readInt(); + host = in.readUTF(); + split.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(columnMapping); + out.writeUTF(keyspace); + out.writeUTF(columnFamily); + out.writeInt(rangeBatchSize); + out.writeInt(slicePredicateSize); + out.writeUTF(partitioner); + out.writeInt(port); + out.writeUTF(host); + split.write(out); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } + + @Override + public long getLength() { + return split.getLength(); + } + + public String getKeyspace() { + return keyspace; + } + + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public int getRangeBatchSize() { + return rangeBatchSize; + } + + public void setRangeBatchSize(int rangeBatchSize) { + this.rangeBatchSize = rangeBatchSize; + } + + public int getSlicePredicateSize() { + return slicePredicateSize; + } + + public void setSlicePredicateSize(int slicePredicateSize) { + this.slicePredicateSize = slicePredicateSize; + } + + public ColumnFamilySplit getSplit() { + return split; + } + + public String getColumnMapping() { + return columnMapping; + } + + public void setColumnMapping(String mapping) { + this.columnMapping = mapping; + } + + public void setPartitioner(String part) { + partitioner = part; + } + + public String getPartitioner() { + return partitioner; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + @Override + public String toString() { + return this.host + " " + this.port + " " + this.partitioner; + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardSplit.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java (revision 0) @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.input; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; + +public class LazyCassandraCellMap extends LazyMap { + + private HiveCassandraStandardRowResult rowResult; + private String cassandraColumnFamily; + + protected LazyCassandraCellMap(LazyMapObjectInspector oi) { + super(oi); + } + + public void init(HiveCassandraStandardRowResult rr, String columnFamily) { + rowResult = rr; + cassandraColumnFamily = columnFamily; + setParsed(false); + } + + private void parse() { + if (cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + } + + /** + * Get the value in the map for the given key. + * + * @param key + * @return + */ + + @Override + public Object getMapValueElement(Object key) { + if (!getParsed()) { + parse(); + } + + for (Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive) entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive + // writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI != null) { + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject) entry.getValue(); + return v == null ? v : v.getObject(); + } + } + } + + return null; + } + + @Override + public Map getMap() { + if (!getParsed()) { + parse(); + } + return cachedMap; + } + + @Override + public int getMapSize() { + if (!getParsed()) { + parse(); + } + return cachedMap.size(); + } + +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraCellMap.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java (revision 0) @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Properties; + +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cassandra.FramedConnWrapper; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +@SuppressWarnings("deprecation") +public class HiveCassandraOutputFormat implements HiveOutputFormat, + OutputFormat { + + static final Log LOG = LogFactory.getLog(HiveCassandraOutputFormat.class); + + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + + final String cassandraKeySpace = jc.get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + final String cassandraHost = jc.get(StandardColumnSerDe.CASSANDRA_HOST); + final int cassandraPort = Integer.parseInt(jc.get(StandardColumnSerDe.CASSANDRA_PORT)); + final String consistencyLevel = jc.get(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, "ONE"); + ConsistencyLevel level = null; + if (consistencyLevel.equalsIgnoreCase("QUORUM")) { + level = ConsistencyLevel.QUORUM; + } else if (consistencyLevel.equalsIgnoreCase("ALL")) { + level = ConsistencyLevel.ALL; + } else { + level = ConsistencyLevel.ONE; + } + final ConsistencyLevel fLevel = level; + + return new RecordWriter() { + private FramedConnWrapper wrap; + + @Override + public void close(boolean abort) throws IOException { + if (wrap != null) { + wrap.close(); + } + } + + @Override + public void write(Writable w) throws IOException { + CassandraPut put = (CassandraPut) w; + + for (CassandraColumn c : put.getColumns()) { + ColumnParent parent = new ColumnParent(); + parent.setColumn_family(c.getColumnFamily()); + try { + ensureConnection(cassandraHost, cassandraPort); + wrap.getClient().set_keyspace(cassandraKeySpace); + Column col = new Column(); + col.setValue(c.getValue()); + col.setTimestamp(c.getTimeStamp()); + col.setName(c.getColumn()); + wrap.getClient().insert(ByteBuffer.wrap(put.getKey().getBytes()), parent, col, fLevel); + } catch (InvalidRequestException e) { + throw new IOException(e); + } catch (UnavailableException e) { + throw new IOException(e); + } catch (TimedOutException e) { + throw new IOException(e); + } catch (TException e) { + throw new IOException(e); + } + } + } // end write + + public void ensureConnection(String host, int port) throws TTransportException { + if (wrap == null) { + wrap = new FramedConnWrapper(host, port, 5000); + wrap.open(); + } + } + + }; + } + + @Override + public void checkOutputSpecs(FileSystem arg0, JobConf jc) throws IOException { + + } + + @Override + public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem arg0, + JobConf arg1, String arg2, Progressable arg3) throws IOException { + throw new RuntimeException("Error: Hive should not invoke this method."); + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java (revision 0) @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; + +public class CassandraPut implements Writable { + + private String key; + private List columns; + + public CassandraPut() { + columns = new ArrayList(); + } + + public CassandraPut(String key) { + this(); + setKey(key); + } + + @Override + public void readFields(DataInput in) throws IOException { + key = in.readUTF(); + int ilevel = in.readInt(); + int cols = in.readInt(); + for (int i = 0; i < cols; i++) { + CassandraColumn cc = new CassandraColumn(); + cc.readFields(in); + columns.add(cc); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(this.key); + out.writeInt(1); + out.writeInt(this.columns.size()); + for (CassandraColumn c : this.columns) { + c.write(out); + } + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public List getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("key: "); + sb.append(this.key); + for (CassandraColumn col : this.columns) { + sb.append("column : ["); + sb.append(col.toString()); + sb.append("]"); + } + return sb.toString(); + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraPut.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java (revision 0) @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.output; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class CassandraColumn implements Writable { + + private String columnFamily; + private long timeStamp; + private byte[] column; + private byte[] value; + + @Override + public void readFields(DataInput din) throws IOException { + columnFamily = din.readUTF(); + timeStamp = din.readLong(); + int clength = din.readInt(); + column = new byte[clength]; + din.readFully(column, 0, clength); + int vlength = din.readInt(); + value = new byte[vlength]; + din.readFully(value, 0, vlength); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(columnFamily); + out.writeLong(timeStamp); + out.writeInt(column.length); + out.write(column); + out.writeInt(value.length); + out.write(value); + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public byte[] getColumn() { + return column; + } + + public void setColumn(byte[] column) { + this.column = column; + } + + public byte[] getValue() { + return value; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("cf:" + this.columnFamily); + sb.append("column:" + new String(this.column)); + sb.append("value:" + new String(this.value)); + return sb.toString(); + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/output/CassandraColumn.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java (revision 0) @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra; + +import java.util.Map; +import java.util.Properties; + +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat; +import org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat; +import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook { + + private FramedConnWrapper wrap; + + private Configuration configuration; + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) { + Properties tableProperties = tableDesc.getProperties(); + String keyspaceName = tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + if (keyspaceName == null) { + keyspaceName = tableProperties.getProperty(Constants.META_TABLE_NAME); + } + + jobProperties.put(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME, keyspaceName); + jobProperties.put(StandardColumnSerDe.CASSANDRA_CF_NAME, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_CF_NAME)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_COL_MAPPING, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_COL_MAPPING)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_HOST, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_HOST)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_PORT, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_PORT)); + jobProperties.put(StandardColumnSerDe.CASSANDRA_PARTITIONER, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_PARTITIONER)); + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_THRIFT_MODE) != null) { + jobProperties.put(StandardColumnSerDe.CASSANDRA_THRIFT_MODE, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_THRIFT_MODE)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_THRIFT_MODE, "FRAMED"); + } + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL) != null) { + jobProperties.put(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, "ONE"); + } + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE) != null) { + jobProperties.put(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, "1000"); + } + + if (tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE) != null) { + jobProperties.put(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + tableProperties.getProperty(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE)); + } else { + jobProperties.put(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, "1000"); + } + } + + @Override + public Class getInputFormatClass() { + return HiveCassandraStandardColumnInputFormat.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public Class getOutputFormatClass() { + return HiveCassandraOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return StandardColumnSerDe.class; + } + + @Override + public Configuration getConf() { + return this.configuration; + } + + @Override + public void setConf(Configuration arg0) { + this.configuration = arg0; + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (!isExternal) { + throw new MetaException("Cassandra tables must be external."); + } + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for Cassandra."); + } + String keyspaceName = this.getCassandraKeyspaceName(tbl); + + Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_COL_MAPPING) == null) { + throw new MetaException("The column mapping must be defiend as " + + StandardColumnSerDe.CASSANDRA_COL_MAPPING); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_CF_NAME) == null) { + throw new MetaException("The column family name must be defined as " + + StandardColumnSerDe.CASSANDRA_CF_NAME); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_HOST) == null) { + throw new MetaException("Initial host to determine splits must be suppled " + + StandardColumnSerDe.CASSANDRA_HOST); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_PORT) == null) { + throw new MetaException("The RCP port must be defined (typically 9160) " + + StandardColumnSerDe.CASSANDRA_PORT); + } + if (serdeParam.get(StandardColumnSerDe.CASSANDRA_PARTITIONER) == null) { + throw new MetaException("The partitioner must be defined (typically " + + "org.apache.cassandra.dht.RandomPartitioner) " + + StandardColumnSerDe.CASSANDRA_PARTITIONER); + } + + String cassandraHost = serdeParam.get(StandardColumnSerDe.CASSANDRA_HOST); + int cassandraPort = Integer.parseInt(serdeParam.get(StandardColumnSerDe.CASSANDRA_PORT)); + + try { + this.ensureConnection(cassandraHost, cassandraPort); + try { + KsDef ks = wrap.getClient().describe_keyspace(keyspaceName); + } catch (NotFoundException ex) { + throw new MetaException(keyspaceName + + " not found. The storage handler will not create it. "); + } + } catch (TException e) { + throw new MetaException("An internal exception prevented this action from taking place." + + e.getMessage()); + } catch (InvalidRequestException e) { + throw new MetaException("An internal exception prevented this action from taking place." + + e.getMessage()); + } + } + + private String getCassandraKeyspaceName(Table tbl) { + String tableName = tbl.getParameters().get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + if (tableName == null) { + tableName = tbl.getSd().getSerdeInfo().getParameters().get( + StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME); + } + if (tableName == null) { + tableName = tbl.getTableName(); + } + return tableName; + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + // No work needed + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + // No work needed + } + + @Override + public void preDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // nothing to do + } + + public void ensureConnection(String host, int port) throws TTransportException { + if (wrap == null) { + wrap = new FramedConnWrapper(host, port, 5000); + wrap.open(); + } + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java (revision 0) @@ -0,0 +1,455 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra.serde; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardRowResult; +import org.apache.hadoop.hive.cassandra.input.LazyCassandraRow; +import org.apache.hadoop.hive.cassandra.output.CassandraColumn; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; + +public class StandardColumnSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(StandardColumnSerDe.class.getName()); + public static final String CASSANDRA_KEYSPACE_NAME = "cassandra.ks.name"; // keyspace + public static final String CASSANDRA_CF_NAME = "cassandra.cf.name"; // column family + public static final String CASSANDRA_RANGE_BATCH_SIZE = "cassandra.range.size"; + public static final String CASSANDRA_SLICE_PREDICATE_SIZE = "cassandra.slice.predicate.size"; + public static final String CASSANDRA_HOST = "cassandra.host"; // initialHost + public static final String CASSANDRA_PORT = "cassandra.port"; // rcpPort + public static final String CASSANDRA_PARTITIONER = "cassandra.partitioner"; // partitioner + public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; + public static final String CASSANDRA_KEY_COLUMN = ":key"; + public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.consistency.level"; + public static final String CASSANDRA_THRIFT_MODE = "cassandra.thrift.mode"; + + /* names of columns from SerdeParameters */ + private List cassandraColumnNames; + /* index of key column in results */ + private int iKey; + + private ObjectInspector cachedObjectInspector; + private SerDeParameters serdeParams; + private LazyCassandraRow cachedCassandraRow; + private List cassandraColumnNamesBytes; + private final ByteStream.Output serializeStream = new ByteStream.Output(); + private boolean useJSONSerialize; + + private byte[] separators; // the separators array + private boolean escaped; // whether we need to escape the data when writing out + private byte escapeChar; // which char to use as the escape char, e.g. '\\' + private boolean[] needsEscape; // which chars need to be escaped. This array should have size + + // of 128. Negative byte values (or byte values >= 128) are + // never escaped. + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + initCassandraSerDeParameters(conf, tbl, getClass().getName()); + cachedObjectInspector = LazyFactory.createLazyStructInspector( + serdeParams.getColumnNames(), + serdeParams.getColumnTypes(), + serdeParams.getSeparators(), + serdeParams.getNullSequence(), + serdeParams.isLastColumnTakesRest(), + serdeParams.isEscaped(), + serdeParams.getEscapeChar()); + cachedCassandraRow = new LazyCassandraRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("CassandraSerDe initialized with : columnNames = " + + StringUtils.join(serdeParams.getColumnNames(), ",") + + " columnTypes = " + + StringUtils.join(serdeParams.getColumnTypes(), ",") + + " cassandraColumnMapping = " + + cassandraColumnNames); + } + } + + private void initCassandraSerDeParameters(Configuration job, Properties tbl, String serdeName) + throws SerDeException { + String cassandraColumnNameProperty = + tbl.getProperty(StandardColumnSerDe.CASSANDRA_COL_MAPPING); + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + cassandraColumnNames = parseColumnMapping(cassandraColumnNameProperty); + iKey = cassandraColumnNames.indexOf(StandardColumnSerDe.CASSANDRA_KEY_COLUMN); + + cassandraColumnNamesBytes = initColumnNamesBytes(cassandraColumnNames); + + // Build the type property string if not supplied + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < cassandraColumnNames.size(); i++) { + if (sb.length() > 0) { + sb.append(":"); + } + String colName = cassandraColumnNames.get(i); + if (isSpecialColumn(colName)) { + // a special column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } else if (colName.endsWith(":")) { + // a the cf name becomes a MAP + sb.append( + Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + + "," + Constants.STRING_TYPE_NAME + ">"); + } else { + // an individual column becomes a STRING + sb.append(Constants.STRING_TYPE_NAME); + } + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); + + if (cassandraColumnNames.size() != serdeParams.getColumnNames().size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.getColumnNames().size() + + " elements while cassandra.columns.mapping has " + + cassandraColumnNames.size() + " elements" + + " (counting the key if implicit)"); + } + + separators = serdeParams.getSeparators(); + escaped = serdeParams.isEscaped(); + escapeChar = serdeParams.getEscapeChar(); + needsEscape = serdeParams.getNeedsEscape(); + + // we just can make sure that "StandardColumn:" is mapped to MAP + for (int i = 0; i < cassandraColumnNames.size(); i++) { + String cassandraColName = cassandraColumnNames.get(i); + if (cassandraColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.getColumnTypes().get(i); + if ((typeInfo.getCategory() != Category.MAP) || + (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName() + != Constants.STRING_TYPE_NAME)) { + + throw new SerDeException( + serdeName + ": Cassandra column family '" + + cassandraColName + + "' should be mapped to map but is mapped to " + + typeInfo.getTypeName()); + } + } + } + } + + /* + * + * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) + * Turns a Cassandra row into a Hive row. + */ + @Override + public Object deserialize(Writable w) throws SerDeException { + if (!(w instanceof HiveCassandraStandardRowResult)) { + throw new SerDeException(getClass().getName() + ": expects Cassandra Row Result"); + } + HiveCassandraStandardRowResult crr = (HiveCassandraStandardRowResult) w; + this.cachedCassandraRow.init(crr, this.cassandraColumnNames, cassandraColumnNamesBytes); + return cachedCassandraRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return this.cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return CassandraPut.class; + } + + /* + * Turns obj (a Hive Row) into a CassandraPut which is key and a MapWritable + * of column/values + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames().size() > 0) ? + ((StructObjectInspector) getObjectInspector()).getAllStructFieldRefs() + : null; + CassandraPut put = null; + try { + String key = serializeField(iKey, null, fields, list, declaredFields); + if (key == null) { + throw new SerDeException("Cassandra row key cannot be NULL"); + } + put = new CassandraPut(key); + // Serialize each field except key (done already) + for (int i = 0; i < fields.size(); i++) { + if (i != iKey) { + serializeField(i, put, fields, list, declaredFields); + } + } + } catch (IOException e) { + throw new SerDeException(e); + } + return put; + } + + private String serializeField(int i, CassandraPut put, List fields, + List list, List declaredFields) throws IOException { + + // column name + String cassandraColumn = cassandraColumnNames.get(i); + + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + if (f == null) { + return null; + } + + // If the field corresponds to a column family in cassandra + if (cassandraColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector) foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(f); + if (map == null) { + return null; + } else { + for (Map.Entry entry : map.entrySet()) { + // Get the Key + serializeStream.reset(); + serialize(entry.getKey(), koi, 3); + + // Get the column-qualifier + byte[] columnQualifier = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, + serializeStream.getCount()); + + // Get the Value + serializeStream.reset(); + + boolean isNotNull = serialize(entry.getValue(), voi, 3); + if (!isNotNull) { + continue; + } + byte[] value = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount()); + + CassandraColumn cc = new CassandraColumn(); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setColumnFamily(cassandraColumn); + cc.setColumn(columnQualifier); + cc.setValue(value); + put.getColumns().add(cc); + + } + } + } else { + + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + + serializeStream.reset(); + boolean isNotNull; + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + isNotNull = serialize(SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1); + } else { + isNotNull = serialize(f, foi, 1); + } + if (!isNotNull) { + return null; + } + byte[] key = new byte[serializeStream.getCount()]; + System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i == iKey) { + return new String(key); + } + CassandraColumn cc = new CassandraColumn(); + cc.setTimeStamp(System.currentTimeMillis()); + cc.setColumnFamily(this.cassandraColumnNames.get(i).split(":")[0]); + cc.setColumn(this.cassandraColumnNames.get(i).split(":")[1].getBytes()); + cc.setValue(key); + put.getColumns().add(cc); + } + return null; + } + + private boolean serialize(Object obj, ObjectInspector objInspector, int level) + throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8( + serializeStream, obj, + (PrimitiveObjectInspector) objInspector, + escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector) objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), eoi, level + 1); + } + } + return true; + } + case MAP: { + char separator = (char) separators[level]; + char keyValueSeparator = (char) separators[level + 1]; + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry : map.entrySet()) { + if (first) { + first = false; + } else { + serializeStream.write(separator); + } + serialize(entry.getKey(), koi, level + 2); + serializeStream.write(keyValueSeparator); + serialize(entry.getValue(), voi, level + 2); + } + } + return true; + } + case STRUCT: { + char separator = (char) separators[level]; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + serializeStream.write(separator); + } + serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1); + } + } + return true; + } + } + throw new RuntimeException("Unknown category type: " + objInspector.getCategory()); + } + + public static List parseColumnMapping(String columnMapping) { + String[] columnArray = columnMapping.split(","); + List columnList = Arrays.asList(columnArray); + int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); + if (iKey == -1) { + columnList = new ArrayList(columnList); + columnList.add(0, CASSANDRA_KEY_COLUMN); + } + return columnList; + } + + public static List initColumnNamesBytes(List columnNames) { + List columnBytes = new ArrayList(); + String column = null; + for (int i = 0; i < columnNames.size(); i++) { + column = columnNames.get(i); + try { + if (column.endsWith(":")) { + columnBytes.add((column.split(":")[0]).getBytes("UTF-8")); + } else { + columnBytes.add((column).getBytes("UTF-8")); + } + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException(ex); + } + } + return columnBytes; + } + + public static boolean isSpecialColumn(String columnName) { + return columnName.equals(CASSANDRA_KEY_COLUMN); + } + + @Override + public SerDeStats getSerDeStats() { + // no support for statistics + return null; + } + +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/serde/StandardColumnSerDe.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java =================================================================== --- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java (revision 0) +++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cassandra; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class FramedConnWrapper { + + private final TTransport transport; + private final TProtocol proto; + private final TSocket socket; + + public FramedConnWrapper(String host, int port, int timeout) { + socket = new TSocket(host, port, timeout); + transport = new TFramedTransport(socket); + proto = new TBinaryProtocol(transport); + } + + public void open() throws TTransportException { + transport.open(); + } + + public void close() { + transport.close(); + socket.close(); + } + + public Cassandra.Client getClient() { + Cassandra.Client client = new Cassandra.Client(proto); + return client; + } +} Property changes on: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/FramedConnWrapper.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: cassandra-handler/build.xml =================================================================== --- cassandra-handler/build.xml (revision 0) +++ cassandra-handler/build.xml (revision 0) @@ -0,0 +1,94 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Property changes on: cassandra-handler/build.xml ___________________________________________________________________ Added: svn:mime-type + text/xml Added: svn:keywords + Date Revision Author HeadURL Id Added: svn:eol-style + native Index: build-common.xml =================================================================== --- build-common.xml (revision 1182878) +++ build-common.xml (working copy) @@ -166,7 +166,7 @@ description="Retrieve Ivy-managed Hadoop source artifacts" unless="ivy.skip"> + pattern="${build.dir.hadoop}/[artifact](-[classifier])-[revision].[ext]"/> + deprecation="${javac.deprecation}" + includeantruntime="false"> @@ -280,7 +281,7 @@ - + + deprecation="${javac.deprecation}" + includeantruntime="false"> @@ -322,7 +324,8 @@ optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> @@ -396,7 +399,7 @@ + depends="test-conditions,gen-test,compile-test,test-jar,test-init" description="Run the tests"> Index: hwi/build.xml =================================================================== --- hwi/build.xml (revision 1182878) +++ hwi/build.xml (working copy) @@ -69,7 +69,8 @@ includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> Index: common/build.xml =================================================================== --- common/build.xml (revision 1182878) +++ common/build.xml (working copy) @@ -35,7 +35,8 @@ includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> Index: service/build.xml =================================================================== --- service/build.xml (revision 1182878) +++ service/build.xml (working copy) @@ -58,7 +58,7 @@ destdir="${build.classes}" debug="${javac.debug}" deprecation="${javac.deprecation}" - > + includeantruntime="false"> Index: serde/build.xml =================================================================== --- serde/build.xml (revision 1182878) +++ serde/build.xml (working copy) @@ -49,7 +49,8 @@ srcdir="${src.dir}/java/:${src.dir}/gen/thrift/gen-javabean/:${src.dir}/gen/protobuf/gen-java/" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> Index: cli/build.xml =================================================================== --- cli/build.xml (revision 1182878) +++ cli/build.xml (working copy) @@ -35,7 +35,8 @@ includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> Index: ql/ivy.xml =================================================================== --- ql/ivy.xml (revision 1182878) +++ ql/ivy.xml (working copy) @@ -34,6 +34,18 @@ + + + + + + + + + + + + Index: ql/build.xml =================================================================== --- ql/build.xml (revision 1182878) +++ ql/build.xml (working copy) @@ -179,7 +179,8 @@ includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false"> Index: pdk/build.xml =================================================================== --- pdk/build.xml (revision 1182878) +++ pdk/build.xml (working copy) @@ -35,7 +35,8 @@ includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}" - deprecation="${javac.deprecation}"> + deprecation="${javac.deprecation}" + includeantruntime="false">