Index: build.xml
===================================================================
--- build.xml (revision 1078995)
+++ build.xml (working copy)
@@ -96,7 +96,7 @@
-
+
@@ -118,7 +118,7 @@
-
+
@@ -129,7 +129,7 @@
-
+
@@ -362,6 +362,7 @@
+
Index: ivy/ivysettings.xml
===================================================================
--- ivy/ivysettings.xml (revision 1078995)
+++ ivy/ivysettings.xml (working copy)
@@ -61,6 +61,7 @@
+
Index: ivy/libraries.properties
===================================================================
--- ivy/libraries.properties (revision 1078995)
+++ ivy/libraries.properties (working copy)
@@ -26,6 +26,7 @@
datanucleus-core.version=2.0.3
datanucleus-enhancer.version=2.0.3
datanucleus-rdbms.version=2.0.3
+cassandra.version=0.7.2
checkstyle.version=5.0
commons-cli.version=1.2
commons-codec.version=1.3
Index: cassandra-handler/conf/cassandra.yaml
===================================================================
--- cassandra-handler/conf/cassandra.yaml (revision 0)
+++ cassandra-handler/conf/cassandra.yaml (revision 0)
@@ -0,0 +1,456 @@
+# Cassandra storage config YAML
+
+# NOTE:
+# See http://wiki.apache.org/cassandra/StorageConfiguration for
+# full explanations of configuration directives
+# /NOTE
+
+# The name of the cluster. This is mainly used to prevent machines in
+# one logical cluster from joining another.
+cluster_name: 'Test Cluster'
+
+# You should always specify InitialToken when setting up a production
+# cluster for the first time, and often when adding capacity later.
+# The principle is that each node should be given an equal slice of
+# the token ring; see http://wiki.apache.org/cassandra/Operations
+# for more details.
+#
+# If blank, Cassandra will request a token bisecting the range of
+# the heaviest-loaded existing node. If there is no load information
+# available, such as is the case with a new cluster, it will pick
+# a random token, which will lead to hot spots.
+initial_token:
+
+# Set to true to make new [non-seed] nodes automatically migrate data
+# to themselves from the pre-existing nodes in the cluster. Defaults
+# to false because you can only bootstrap N machines at a time from
+# an existing cluster of N, so if you are bringing up a cluster of
+# 10 machines with 3 seeds you would have to do it in stages. Leaving
+# this off for the initial start simplifies that.
+auto_bootstrap: false
+
+# See http://wiki.apache.org/cassandra/HintedHandoff
+hinted_handoff_enabled: true
+
+# authentication backend, implementing IAuthenticator; used to identify users
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
+
+# authorization backend, implementing IAuthority; used to limit access/provide permissions
+authority: org.apache.cassandra.auth.AllowAllAuthority
+
+# any IPartitioner may be used, including your own as long as it is on
+# the classpath. Out of the box, Cassandra provides
+# org.apache.cassandra.dht.RandomPartitioner
+# org.apache.cassandra.dht.ByteOrderedPartitioner,
+# org.apache.cassandra.dht.OrderPreservingPartitioner, and
+# org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
+# (CollatingOPP colates according to EN,US rules, not naive byte
+# ordering. Use this as an example if you need locale-aware collation.)
+partitioner: org.apache.cassandra.dht.RandomPartitioner
+
+# directories where Cassandra should store data on disk.
+data_file_directories:
+ - /tmp/data
+
+# commit log
+commitlog_directory: /tmp/commit
+
+# saved caches
+saved_caches_directory: /tmp/caches
+
+# Size to allow commitlog to grow to before creating a new segment
+commitlog_rotation_threshold_in_mb: 128
+
+# commitlog_sync supports the following modes:
+#
+# batch:
+# In batch mode, Cassandra won't ack writes until the commit log
+# has been fsynced to disk. But fsyncing each write at once is
+# performance-prohibitive, so instead Cassandra will wait up to
+# commitlog_sync_batch_window_in_ms milliseconds for other writes, before
+# syncing that "batch" at once. This causes a performance penalty
+# of about 15% when the commitlog is on a separate device, and much more
+# when it shares the same device as the data files.
+#
+# periodic:
+# Writes may be acked immediately (without waiting for the commitlog
+# append) and the CommitLog is simply synced every
+# commitlog_sync_period_in_ms milliseconds.
+#
+# periodic_without_flush:
+# Like periodic, but the commitlog write buffer is only flushed
+# before the sync, so any interruption to the process can be
+# expected to lose some writes. This is the old 0.6 periodic
+# behavior and will be removed in future versions if testing
+# continues to show no performance benefit over normal periodic.
+commitlog_sync: periodic
+commitlog_sync_period_in_ms: 10000
+# commitlog_sync: batch
+# commitlog_sync_batch_window_in_ms: 10
+
+# Addresses of hosts that are deemed contact points.
+# Cassandra nodes use this list of hosts to find each other and learn
+# the topology of the ring. You must change this if you are running
+# multiple nodes!
+seeds:
+ - 127.0.0.1
+
+# Access mode. mmapped i/o is substantially faster, but only practical on
+# a 64bit machine (which notably does not include EC2 "small" instances)
+# or relatively small datasets. "auto", the safe choice, will enable
+# mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only"
+# (which may allow you to get part of the benefits of mmap on a 32bit
+# machine by mmapping only index files) and "standard".
+# (The buffer size settings that follow only apply to standard,
+# non-mmapped i/o.)
+disk_access_mode: auto
+
+# Unlike most systems, in Cassandra writes are faster than reads, so
+# you can afford more of those in parallel. A good rule of thumb is 2
+# concurrent reads per processor core. Increase ConcurrentWrites to
+# the number of clients writing at once if you enable CommitLogSync +
+# CommitLogSyncDelay. -->
+concurrent_reads: 8
+concurrent_writes: 32
+
+# This sets the amount of memtable flush writer threads. These will
+# be blocked by disk io, and each one will hold a memtable in memory
+# while blocked. If you have a large heap and many data directories,
+# you can increase this value for better flush performance.
+# By default this will be set to the amount of data directories defined.
+#memtable_flush_writers: 1
+
+# Buffer size to use when performing contiguous column slices.
+# Increase this to the size of the column slices you typically perform
+sliced_buffer_size_in_kb: 64
+
+# TCP port, for commands and data
+storage_port: 7000
+
+# Address to bind to and tell other Cassandra nodes to connect to. You
+# _must_ change this if you want multiple nodes to be able to
+# communicate!
+#
+# Leaving it blank leaves it up to InetAddress.getLocalHost(). This
+# will always do the Right Thing *if* the node is properly configured
+# (hostname, name resolution, etc), and the Right Thing is to use the
+# address associated with the hostname (it might not be).
+#
+# Setting this to 0.0.0.0 is always wrong.
+listen_address: 127.0.0.1
+
+# The address to bind the Thrift RPC service to -- clients connect
+# here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if
+# you want Thrift to listen on all interfaces.
+#
+# Leaving this blank has the same effect it does for ListenAddress,
+# (i.e. it will be based on the configured hostname of the node).
+rpc_address: 127.0.0.1
+# port for Thrift to listen for clients on
+rpc_port: 9170
+
+# enable or disable keepalive on rpc connections
+rpc_keepalive: true
+
+# uncomment to set socket buffer sizes on rpc connections
+# rpc_send_buff_size_in_bytes:
+# rpc_recv_buff_size_in_bytes:
+
+# Frame size for thrift (maximum field length).
+# 0 disables TFramedTransport in favor of TSocket. This option
+# is deprecated; we strongly recommend using Framed mode.
+thrift_framed_transport_size_in_mb: 15
+
+# The max length of a thrift message, including all fields and
+# internal thrift overhead.
+thrift_max_message_length_in_mb: 16
+
+# Whether or not to take a snapshot before each compaction. Be
+# careful using this option, since Cassandra won't clean up the
+# snapshots for you. Mostly useful if you're paranoid when there
+# is a data format change.
+snapshot_before_compaction: false
+
+# change this to increase the compaction thread's priority. In java, 1 is the
+# lowest priority and that is our default.
+# compaction_thread_priority: 1
+
+# The threshold size in megabytes the binary memtable must grow to,
+# before it's submitted for flushing to disk.
+binary_memtable_throughput_in_mb: 256
+
+# Add column indexes to a row after its contents reach this size.
+# Increase if your column values are large, or if you have a very large
+# number of columns. The competing causes are, Cassandra has to
+# deserialize this much of the row to read a single column, so you want
+# it to be small - at least if you do many partial-row reads - but all
+# the index data is read for each access, so you don't want to generate
+# that wastefully either.
+column_index_size_in_kb: 64
+
+# Size limit for rows being compacted in memory. Larger rows will spill
+# over to disk and use a slower two-pass compaction process. A message
+# will be logged specifying the row key.
+in_memory_compaction_limit_in_mb: 64
+
+# Time to wait for a reply from other nodes before failing the command
+rpc_timeout_in_ms: 10000
+
+# phi value that must be reached for a host to be marked down.
+# most users should never need to adjust this.
+# phi_convict_threshold: 8
+
+# endpoint_snitch -- Set this to a class that implements
+# IEndpointSnitch, which will let Cassandra know enough
+# about your network topology to route requests efficiently.
+# Out of the box, Cassandra provides
+# - org.apache.cassandra.locator.SimpleSnitch:
+# Treats Strategy order as proximity. This improves cache locality
+# when disabling read repair, which can further improve throughput.
+# - org.apache.cassandra.locator.RackInferringSnitch:
+# Proximity is determined by rack and data center, which are
+# assumed to correspond to the 3rd and 2nd octet of each node's
+# IP address, respectively
+# org.apache.cassandra.locator.PropertyFileSnitch:
+# - Proximity is determined by rack and data center, which are
+# explicitly configured in cassandra-topology.properties.
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+
+# dynamic_snitch -- This boolean controls whether the above snitch is
+# wrapped with a dynamic snitch, which will monitor read latencies
+# and avoid reading from hosts that have slowed (due to compaction,
+# for instance)
+dynamic_snitch: true
+# controls how often to perform the more expensive part of host score
+# calculation
+dynamic_snitch_update_interval_in_ms: 100
+# controls how often to reset all host scores, allowing a bad host to
+# possibly recover
+dynamic_snitch_reset_interval_in_ms: 600000
+# if set greater than zero and read_repair_chance is < 1.0, this will allow
+# 'pinning' of replicas to hosts in order to increase cache capacity.
+# The badness threshold will control how much worse the pinned host has to be
+# before the dynamic snitch will prefer other replicas over it. This is
+# expressed as a double which represents a percentage. Thus, a value of
+# 0.2 means Cassandra would continue to prefer the static snitch values
+# until the pinned host was 20% worse than the fastest.
+dynamic_snitch_badness_threshold: 0.0
+
+# request_scheduler -- Set this to a class that implements
+# RequestScheduler, which will schedule incoming client requests
+# according to the specific policy. This is useful for multi-tenancy
+# with a single Cassandra cluster.
+# NOTE: This is specifically for requests from the client and does
+# not affect inter node communication.
+# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
+# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
+# client requests to a node with a separate queue for each
+# request_scheduler_id. The scheduler is further customized by
+# request_scheduler_options as described below.
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+
+# Scheduler Options vary based on the type of scheduler
+# NoScheduler - Has no options
+# RoundRobin
+# - throttle_limit -- The throttle_limit is the number of in-flight
+# requests per client. Requests beyond
+# that limit are queued up until
+# running requests can complete.
+# The value of 80 here is twice the number of
+# concurrent_reads + concurrent_writes.
+# - default_weight -- default_weight is optional and allows for
+# overriding the default which is 1.
+# - weights -- Weights are optional and will default to 1 or the
+# overridden default_weight. The weight translates into how
+# many requests are handled during each turn of the
+# RoundRobin, based on the scheduler id.
+#
+# request_scheduler_options:
+# throttle_limit: 80
+# default_weight: 5
+# weights:
+# Keyspace1: 1
+# Keyspace2: 5
+
+# request_scheduler_id -- An identifer based on which to perform
+# the request scheduling. Currently the only valid option is keyspace.
+# request_scheduler_id: keyspace
+
+# The Index Interval determines how large the sampling of row keys
+# is for a given SSTable. The larger the sampling, the more effective
+# the index is at the cost of space.
+index_interval: 128
+
+# Keyspaces have ColumnFamilies. (Usually 1 KS per application.)
+# ColumnFamilies have Rows. (Dozens of CFs per KS.)
+# Rows contain Columns. (Many per CF.)
+# Columns contain name:value:timestamp. (Many per Row.)
+#
+# A KS is most similar to a schema, and a CF is most similar to a relational table.
+#
+# Keyspaces, ColumnFamilies, and Columns may carry additional
+# metadata that change their behavior. These are as follows:
+#
+# Keyspace required parameters:
+# - name: name of the keyspace; "system" is
+# reserved for Cassandra Internals.
+# - replica_placement_strategy: the class that determines how replicas
+# are distributed among nodes. Contains both the class as well as
+# configuration information. Must extend AbstractReplicationStrategy.
+# Out of the box, Cassandra provides
+# * org.apache.cassandra.locator.SimpleStrategy
+# * org.apache.cassandra.locator.NetworkTopologyStrategy
+# * org.apache.cassandra.locator.OldNetworkTopologyStrategy
+#
+# SimpleStrategy merely places the first
+# replica at the node whose token is closest to the key (as determined
+# by the Partitioner), and additional replicas on subsequent nodes
+# along the ring in increasing Token order.
+#
+# With NetworkTopologyStrategy,
+# for each datacenter, you can specify how many replicas you want
+# on a per-keyspace basis. Replicas are placed on different racks
+# within each DC, if possible. This strategy also requires rack aware
+# snitch, such as RackInferringSnitch or PropertyFileSnitch.
+# An example:
+# - name: Keyspace1
+# replica_placement_strategy: org.apache.cassandra.locator.NetworkTopologyStrategy
+# strategy_options:
+# DC1 : 3
+# DC2 : 2
+# DC3 : 1
+#
+# OldNetworkToplogyStrategy [formerly RackAwareStrategy]
+# places one replica in each of two datacenters, and the third on a
+# different rack in in the first. Additional datacenters are not
+# guaranteed to get a replica. Additional replicas after three are placed
+# in ring order after the third without regard to rack or datacenter.
+# - replication_factor: Number of replicas of each row
+# Keyspace optional paramaters:
+# - strategy_options: Additional information for the replication strategy.
+# - column_families:
+# ColumnFamily required parameters:
+# - name: name of the ColumnFamily. Must not contain the character "-".
+# - compare_with: tells Cassandra how to sort the columns for slicing
+# operations. The default is BytesType, which is a straightforward
+# lexical comparison of the bytes in each column. Other options are
+# AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType, LongType,
+# and IntegerType (a generic variable-length integer type).
+# You can also specify the fully-qualified class name to a class of
+# your choice extending org.apache.cassandra.db.marshal.AbstractType.
+#
+# ColumnFamily optional parameters:
+# - keys_cached: specifies the number of keys per sstable whose
+# locations we keep in memory in "mostly LRU" order. (JUST the key
+# locations, NOT any column values.) Specify a fraction (value less
+# than 1) or an absolute number of keys to cache. Defaults to 200000
+# keys.
+# - rows_cached: specifies the number of rows whose entire contents we
+# cache in memory. Do not use this on ColumnFamilies with large rows,
+# or ColumnFamilies with high write:read ratios. Specify a fraction
+# (value less than 1) or an absolute number of rows to cache.
+# Defaults to 0. (i.e. row caching is off by default)
+# - comment: used to attach additional human-readable information about
+# the column family to its definition.
+# - read_repair_chance: specifies the probability with which read
+# repairs should be invoked on non-quorum reads. must be between 0
+# and 1. defaults to 1.0 (always read repair).
+# - gc_grace_seconds: specifies the time to wait before garbage
+# collecting tombstones (deletion markers). defaults to 864000 (10
+# days). See http://wiki.apache.org/cassandra/DistributedDeletes
+# - default_validation_class: specifies a validator class to use for
+# validating all the column values in the CF.
+# NOTE:
+# min_ must be less than max_compaction_threshold!
+# - min_compaction_threshold: the minimum number of SSTables needed
+# to start a minor compaction. increasing this will cause minor
+# compactions to start less frequently and be more intensive. setting
+# this to 0 disables minor compactions. defaults to 4.
+# - max_compaction_threshold: the maximum number of SSTables allowed
+# before a minor compaction is forced. decreasing this will cause
+# minor compactions to start more frequently and be less intensive.
+# setting this to 0 disables minor compactions. defaults to 32.
+# /NOTE
+# - row_cache_save_period_in_seconds: number of seconds between saving
+# row caches. The row caches can be saved periodically and if one
+# exists on startup it will be loaded.
+# - key_cache_save_period_in_seconds: number of seconds between saving
+# key caches. The key caches can be saved periodically and if one
+# exists on startup it will be loaded.
+# - memtable_flush_after_mins: The maximum time to leave a dirty table
+# unflushed. This should be large enough that it won't cause a flush
+# storm of all memtables during periods of inactivity.
+# - memtable_throughput_in_mb: The maximum size of the memtable before
+# it is flushed. If undefined, 1/8 * heapsize will be used.
+# - memtable_operations_in_millions: Number of operations in millions
+# before the memtable is flushed. If undefined, throughput / 64 * 0.3
+# will be used.
+# - column_metadata:
+# Column required parameters:
+# - name: binds a validator (and optionally an indexer) to columns
+# with this name in any row of the enclosing column family.
+# - validator: like cf.compare_with, an AbstractType that checks
+# that the value of the column is well-defined.
+# Column optional parameters:
+# NOTE:
+# index_name cannot be set if index_type is not also set!
+# - index_name: User-friendly name for the index.
+# - index_type: The type of index to be created. Currently only
+# KEYS is supported.
+# /NOTE
+#
+# NOTE:
+# this keyspace definition is for demonstration purposes only.
+# Cassandra will not load these definitions during startup. See
+# http://wiki.apache.org/cassandra/FAQ#no_keyspaces for an explanation.
+# /NOTE
+keyspaces:
+ - name: Keyspace1
+ replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
+ replication_factor: 1
+ column_families:
+ - name: Standard1
+ compare_with: BytesType
+ keys_cached: 10000
+ rows_cached: 1000
+ row_cache_save_period_in_seconds: 0
+ key_cache_save_period_in_seconds: 3600
+ memtable_flush_after_mins: 59
+ memtable_throughput_in_mb: 255
+ memtable_operations_in_millions: 0.29
+
+ - name: Standard2
+ compare_with: UTF8Type
+ read_repair_chance: 0.1
+ keys_cached: 100
+ gc_grace_seconds: 0
+ min_compaction_threshold: 5
+ max_compaction_threshold: 31
+
+ - name: StandardByUUID1
+ compare_with: TimeUUIDType
+
+ - name: Super1
+ column_type: Super
+ compare_with: BytesType
+ compare_subcolumns_with: BytesType
+
+ - name: Super2
+ column_type: Super
+ compare_subcolumns_with: UTF8Type
+ rows_cached: 10000
+ keys_cached: 50
+ comment: 'A column family with supercolumns, whose column and subcolumn names are UTF8 strings'
+
+ - name: Super3
+ column_type: Super
+ compare_with: LongType
+ comment: 'A column family with supercolumns, whose column names are Longs (8 bytes)'
+
+ - name: Indexed1
+ default_validation_class: LongType
+ column_metadata:
+ - name: birthdate
+ validator_class: LongType
+ index_name: birthdate_idx
+ index_type: KEYS
Index: cassandra-handler/conf/cassandra-topology.properties
===================================================================
--- cassandra-handler/conf/cassandra-topology.properties (revision 0)
+++ cassandra-handler/conf/cassandra-topology.properties (revision 0)
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Cassandra Node IP=Data Center:Rack
+192.168.1.100=DC1:RAC1
+192.168.2.200=DC2:RAC2
+
+10.0.0.10=DC1:RAC1
+10.0.0.11=DC1:RAC1
+10.0.0.12=DC1:RAC2
+
+10.20.114.10=DC2:RAC1
+10.20.114.11=DC2:RAC1
+
+10.21.119.13=DC3:RAC1
+10.21.119.10=DC3:RAC1
+
+10.0.0.13=DC1:RAC2
+10.21.119.14=DC3:RAC2
+10.20.114.15=DC2:RAC2
+
+# default for unknown nodes
+default=DC1:r1
+
Index: cassandra-handler/conf/log4j-tools.properties.old
===================================================================
--- cassandra-handler/conf/log4j-tools.properties.old (revision 0)
+++ cassandra-handler/conf/log4j-tools.properties.old (revision 0)
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set the root to INFO
+# and the pattern to %c instead of %l. (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=WARN,stderr
+
+# stderr
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.target=System.err
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
Index: cassandra-handler/conf/README.txt
===================================================================
--- cassandra-handler/conf/README.txt (revision 0)
+++ cassandra-handler/conf/README.txt (revision 0)
@@ -0,0 +1,13 @@
+Required configuration files
+============================
+
+cassandra.yaml: main Cassandra configuration file
+log4j-server.proprties: log4j configuration file for Cassandra server
+
+
+Optional configuration files
+============================
+
+access.properties: used for authorization
+passwd.properties: used for authentication
+cassandra-topology.properties: used by PropertyFileSnitch
Index: cassandra-handler/conf/log4j-server.properties.old
===================================================================
--- cassandra-handler/conf/log4j-server.properties.old (revision 0)
+++ cassandra-handler/conf/log4j-server.properties.old (revision 0)
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# for production, you should probably set pattern to %c instead of %l.
+# (%l is slower.)
+
+# output messages into a rolling log file as well as stdout
+log4j.rootLogger=INFO,stdout,R
+
+# stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
+
+# rolling log file
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.maxFileSize=20MB
+log4j.appender.R.maxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
+# Edit the next line to point to your logs directory
+log4j.appender.R.File=/var/log/cassandra/system.log
+
+# Application logging options
+#log4j.logger.com.facebook=DEBUG
+#log4j.logger.com.facebook.infrastructure.gms=DEBUG
+#log4j.logger.com.facebook.infrastructure.db=DEBUG
Index: cassandra-handler/conf/passwd.properties
===================================================================
--- cassandra-handler/conf/passwd.properties (revision 0)
+++ cassandra-handler/conf/passwd.properties (revision 0)
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This is a sample password file for SimpleAuthenticator. The format of
+# this file is username=password. If -Dpasswd.mode=MD5 then the password
+# is represented as an md5 digest, otherwise it is cleartext (keep this
+# in mind when setting file mode and ownership).
+jsmith=havebadpass
+Elvis\ Presley=graceland4evar
+dilbert=nomoovertime
Index: cassandra-handler/conf/access.properties
===================================================================
--- cassandra-handler/conf/access.properties (revision 0)
+++ cassandra-handler/conf/access.properties (revision 0)
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is a sample access file for SimpleAuthority. The format of this file
+# is KEYSPACE[.COLUMNFAMILY].PERMISSION=USERS, where:
+#
+# * KEYSPACE is the keyspace name.
+# * COLUMNFAMILY is the column family name.
+# * PERMISSION is one of or for read-only or read-write respectively.
+# * USERS is a comma delimited list of users from passwd.properties.
+#
+# See below for example entries.
+
+# NOTE: This file contains potentially sensitive information, please keep
+# this in mind when setting its mode and ownership.
+
+# The magical '' property lists users who can modify the
+# list of keyspaces: all users will be able to view the list of keyspaces.
+=jsmith
+
+# Access to Keyspace1 (add/remove column families, etc).
+Keyspace1.=jsmith,Elvis Presley
+Keyspace1.=dilbert
+
+# Access to Standard1 (keyspace Keyspace1)
+Keyspace1.Standard1.=jsmith,Elvis Presley,dilbert
Index: cassandra-handler/conf/cassandra-env.sh
===================================================================
--- cassandra-handler/conf/cassandra-env.sh (revision 0)
+++ cassandra-handler/conf/cassandra-env.sh (revision 0)
@@ -0,0 +1,123 @@
+MAX_HEAP_SIZE="1G"
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+calculate_heap_size()
+{
+ case "`uname`" in
+ Linux)
+ system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'`
+ MAX_HEAP_SIZE=$((system_memory_in_mb / 2))M
+ return 0
+ ;;
+ FreeBSD)
+ system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
+ MAX_HEAP_SIZE=$((system_memory_in_bytes / 1024 / 1024 / 2))M
+ return 0
+ ;;
+ *)
+ MAX_HEAP_SIZE=1024M
+ return 1
+ ;;
+ esac
+}
+
+# The amount of memory to allocate to the JVM at startup, you almost
+# certainly want to adjust this for your environment. If left commented
+# out, the heap size will be automatically determined by calculate_heap_size
+# MAX_HEAP_SIZE="4G"
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+ calculate_heap_size
+fi
+
+# Specifies the default port over which Cassandra will be available for
+# JMX connections.
+JMX_PORT="8001"
+
+# To use mx4j, an HTML interface for JMX, add mx4j-tools.jar to the lib/ directory.
+# By default mx4j listens on 0.0.0.0:8081. Uncomment the following lines to control
+# its listen address and port.
+#MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0"
+#MX4J_PORT="-Dmx4jport=8081"
+
+
+# Here we create the arguments that will get passed to the jvm when
+# starting cassandra.
+
+# enable assertions. disabling this in production will give a modest
+# performance benefit (around 5%).
+JVM_OPTS="$JVM_OPTS -ea"
+
+# enable thread priorities, primarily so we can give periodic tasks
+# a lower priority to avoid interfering with client workload
+JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"
+# allows lowering thread priority without being root. see
+# http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html
+JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42"
+
+# min and max heap sizes should be set to the same value to avoid
+# stop-the-world GC pauses during resize, and so that we can lock the
+# heap in memory on startup to prevent any of it from being swapped
+# out.
+JVM_OPTS="$JVM_OPTS -Xms$MAX_HEAP_SIZE"
+JVM_OPTS="$JVM_OPTS -Xmx$MAX_HEAP_SIZE"
+JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError"
+
+if [ "`uname`" = "Linux" ] ; then
+ # reduce the per-thread stack size to minimize the impact of Thrift
+ # thread-per-client. (Best practice is for client connections to
+ # be pooled anyway.) Only do so on Linux where it is known to be
+ # supported.
+ JVM_OPTS="$JVM_OPTS -Xss128k"
+fi
+
+# GC tuning options
+JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC"
+JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC"
+JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled"
+JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8"
+JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1"
+JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
+JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
+
+# GC logging options -- uncomment to enable
+# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
+# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log"
+
+# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See
+# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
+# comment out this entry to enable IPv6 support).
+JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true"
+
+# jmx: metrics and administration interface
+#
+# add this if you're having trouble connecting:
+# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname="
+#
+# see
+# http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole
+# for more on configuring JMX through firewalls, etc. (Short version:
+# get it working with no firewall first.)
+JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
+JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false"
+JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
+JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS"
+JVM_OPTS="$JVM_OPTS $MX4J_PORT"
Index: cassandra-handler/ivy.xml
===================================================================
--- cassandra-handler/ivy.xml (revision 0)
+++ cassandra-handler/ivy.xml (revision 0)
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
Index: cassandra-handler/lib/readme.txt
===================================================================
--- cassandra-handler/lib/readme.txt (revision 0)
+++ cassandra-handler/lib/readme.txt (revision 0)
@@ -0,0 +1 @@
+Required jars are gathered by ../ql/ivy.xml
Index: cassandra-handler/src/test/results/cassandra_queries.q.out
===================================================================
--- cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0)
+++ cassandra-handler/src/test/results/cassandra_queries.q.out (revision 0)
@@ -0,0 +1,450 @@
+PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" )
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1")
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" )
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@cassandra_keyspace1_standard1
+PREHOOK: query: describe cassandra_keyspace1_standard1
+PREHOOK: type: DESCTABLE
+POSTHOOK: query: describe cassandra_keyspace1_standard1
+POSTHOOK: type: DESCTABLE
+key int from deserializer
+value string from deserializer
+PREHOOK: query: EXPLAIN
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 7) 0)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key)))))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Filter Operator
+ predicate:
+ expr: ((key % 7) = 0)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: ((key % 7) = 0)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Extract
+ Select Operator
+ expressions:
+ expr: UDFToInteger(_col0)
+ type: int
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat
+ output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat
+ serde: org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe
+ name: default.cassandra_keyspace1_standard1
+
+
+PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@cassandra_keyspace1_standard1
+POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@cassandra_keyspace1_standard1
+PREHOOK: query: EXPLAIN
+select * from cassandra_keyspace1_standard1 ORDER BY key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+select * from cassandra_keyspace1_standard1 ORDER BY key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ cassandra_keyspace1_standard1
+ TableScan
+ alias: cassandra_keyspace1_standard1
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: int
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select * from cassandra_keyspace1_standard1 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-48_603_3517947258898162165/-mr-10000
+POSTHOOK: query: select * from cassandra_keyspace1_standard1 ORDER BY key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-48_603_3517947258898162165/-mr-10000
+79 val_0
+105 val_105
+119 val_119
+126 val_126
+133 val_133
+168 val_168
+175 val_175
+189 val_189
+196 val_196
+203 val_203
+217 val_217
+224 val_224
+238 val_238
+252 val_252
+266 val_266
+273 val_273
+280 val_280
+287 val_287
+289 val_28
+308 val_308
+315 val_315
+322 val_322
+336 val_336
+357 val_35
+364 val_364
+378 val_378
+392 val_392
+399 val_399
+406 val_406
+413 val_413
+427 val_427
+429 val_42
+448 val_448
+455 val_455
+462 val_462
+469 val_469
+483 val_483
+490 val_490
+497 val_497
+708 val_70
+779 val_77
+846 val_84
+983 val_98
+PREHOOK: query: EXPLAIN
+select value from cassandra_keyspace1_standard1 ORDER BY VALUE
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+select value from cassandra_keyspace1_standard1 ORDER BY VALUE
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL VALUE)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ cassandra_keyspace1_standard1
+ TableScan
+ alias: cassandra_keyspace1_standard1
+ Select Operator
+ expressions:
+ expr: value
+ type: string
+ outputColumnNames: _col0
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select value from cassandra_keyspace1_standard1 ORDER BY VALUE
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-52_764_5259521194449628162/-mr-10000
+POSTHOOK: query: select value from cassandra_keyspace1_standard1 ORDER BY VALUE
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-52_764_5259521194449628162/-mr-10000
+val_0
+val_105
+val_119
+val_126
+val_133
+val_168
+val_175
+val_189
+val_196
+val_203
+val_217
+val_224
+val_238
+val_252
+val_266
+val_273
+val_28
+val_280
+val_287
+val_308
+val_315
+val_322
+val_336
+val_35
+val_364
+val_378
+val_392
+val_399
+val_406
+val_413
+val_42
+val_427
+val_448
+val_455
+val_462
+val_469
+val_483
+val_490
+val_497
+val_70
+val_77
+val_84
+val_98
+PREHOOK: query: EXPLAIN
+select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) a) (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (. (TOK_TABLE_OR_COL a) key)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: int
+ tag: 0
+ value expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ b
+ TableScan
+ alias: b
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: int
+ tag: 1
+ value expressions:
+ expr: value
+ type: string
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1 {VALUE._col1}
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1, _col5
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ file:/tmp/edward/hive_2011-03-07_14-13-56_906_8208402642844752554/-mr-10002
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: int
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col2
+ type: string
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-57_016_5914164914507392118/-mr-10000
+POSTHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2011-03-07_14-13-57_016_5914164914507392118/-mr-10000
+79 val_0 val_0
+105 val_105 val_105
+119 val_119 val_119
+126 val_126 val_126
+133 val_133 val_133
+168 val_168 val_168
+175 val_175 val_175
+189 val_189 val_189
+196 val_196 val_196
+203 val_203 val_203
+217 val_217 val_217
+224 val_224 val_224
+238 val_238 val_238
+252 val_252 val_252
+266 val_266 val_266
+273 val_273 val_273
+280 val_280 val_280
+287 val_287 val_287
+289 val_28 val_28
+308 val_308 val_308
+315 val_315 val_315
+322 val_322 val_322
+336 val_336 val_336
+357 val_35 val_35
+364 val_364 val_364
+378 val_378 val_378
+392 val_392 val_392
+399 val_399 val_399
+406 val_406 val_406
+413 val_413 val_413
+427 val_427 val_427
+429 val_42 val_42
+448 val_448 val_448
+455 val_455 val_455
+462 val_462 val_462
+469 val_469 val_469
+483 val_483 val_483
+490 val_490 val_490
+497 val_497 val_497
+708 val_70 val_70
+779 val_77 val_77
+846 val_84 val_84
+983 val_98 val_98
Index: cassandra-handler/src/test/results/cassandra_queries
===================================================================
--- cassandra-handler/src/test/results/cassandra_queries (revision 0)
+++ cassandra-handler/src/test/results/cassandra_queries (revision 0)
@@ -0,0 +1,365 @@
+PREHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" )
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1")
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" )
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@cassandra_keyspace1_standard1
+PREHOOK: query: describe cassandra_keyspace1_standard1
+PREHOOK: type: DESCTABLE
+POSTHOOK: query: describe cassandra_keyspace1_standard1
+POSTHOOK: type: DESCTABLE
+key int from deserializer
+value string from deserializer
+PREHOOK: query: EXPLAIN
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 7) 0))))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Filter Operator
+ predicate:
+ expr: ((key % 7) = 0)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: ((key % 7) = 0)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: UDFToInteger(_col0)
+ type: int
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat
+ output format: org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat
+ serde: org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe
+ name: default.cassandra_keyspace1_standard1
+
+
+PREHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@cassandra_keyspace1_standard1
+POSTHOOK: query: FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@cassandra_keyspace1_standard1
+PREHOOK: query: EXPLAIN
+select * from cassandra_keyspace1_standard1
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+select * from cassandra_keyspace1_standard1
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select * from cassandra_keyspace1_standard1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_494_7286511593311887975/-mr-10000
+POSTHOOK: query: select * from cassandra_keyspace1_standard1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_494_7286511593311887975/-mr-10000
+252 val_252
+126 val_126
+119 val_119
+196 val_196
+266 val_266
+392 val_392
+413 val_413
+983 val_98
+224 val_224
+427 val_427
+357 val_35
+203 val_203
+483 val_483
+469 val_469
+779 val_77
+79 val_0
+289 val_28
+399 val_399
+490 val_490
+364 val_364
+273 val_273
+462 val_462
+315 val_315
+238 val_238
+308 val_308
+322 val_322
+189 val_189
+429 val_42
+133 val_133
+217 val_217
+448 val_448
+105 val_105
+336 val_336
+846 val_84
+280 val_280
+287 val_287
+406 val_406
+497 val_497
+378 val_378
+708 val_70
+455 val_455
+175 val_175
+168 val_168
+PREHOOK: query: EXPLAIN
+select value from cassandra_keyspace1_standard1
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+select value from cassandra_keyspace1_standard1
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ cassandra_keyspace1_standard1
+ TableScan
+ alias: cassandra_keyspace1_standard1
+ Select Operator
+ expressions:
+ expr: value
+ type: string
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select value from cassandra_keyspace1_standard1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_885_3161823916748354795/-mr-10000
+POSTHOOK: query: select value from cassandra_keyspace1_standard1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-32_885_3161823916748354795/-mr-10000
+val_252
+val_126
+val_119
+val_196
+val_266
+val_392
+val_413
+val_98
+val_224
+val_427
+val_35
+val_203
+val_483
+val_469
+val_77
+val_0
+val_28
+val_399
+val_490
+val_364
+val_273
+val_462
+val_315
+val_238
+val_308
+val_322
+val_189
+val_42
+val_133
+val_217
+val_448
+val_105
+val_336
+val_84
+val_280
+val_287
+val_406
+val_497
+val_378
+val_70
+val_455
+val_175
+val_168
+PREHOOK: query: EXPLAIN
+select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) a) (TOK_TABREF (TOK_TABNAME cassandra_keyspace1_standard1) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: int
+ tag: 0
+ value expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ b
+ TableScan
+ alias: b
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: int
+ tag: 1
+ value expressions:
+ expr: value
+ type: string
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1 {VALUE._col1}
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1, _col5
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@cassandra_keyspace1_standard1
+PREHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-37_515_951258717592195422/-mr-10000
+POSTHOOK: query: select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@cassandra_keyspace1_standard1
+POSTHOOK: Output: file:/tmp/edward/hive_2011-02-25_16-36-37_515_951258717592195422/-mr-10000
+79 val_0 val_0
+105 val_105 val_105
+119 val_119 val_119
+126 val_126 val_126
+133 val_133 val_133
+168 val_168 val_168
+175 val_175 val_175
+189 val_189 val_189
+196 val_196 val_196
+203 val_203 val_203
+217 val_217 val_217
+224 val_224 val_224
+238 val_238 val_238
+252 val_252 val_252
+266 val_266 val_266
+273 val_273 val_273
+280 val_280 val_280
+287 val_287 val_287
+289 val_28 val_28
+308 val_308 val_308
+315 val_315 val_315
+322 val_322 val_322
+336 val_336 val_336
+357 val_35 val_35
+364 val_364 val_364
+378 val_378 val_378
+392 val_392 val_392
+399 val_399 val_399
+406 val_406 val_406
+413 val_413 val_413
+427 val_427 val_427
+429 val_42 val_42
+448 val_448 val_448
+455 val_455 val_455
+462 val_462 val_462
+469 val_469 val_469
+483 val_483 val_483
+490 val_490 val_490
+497 val_497 val_497
+708 val_70 val_70
+779 val_77 val_77
+846 val_84 val_84
+983 val_98 val_98
Index: cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java
===================================================================
--- cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0)
+++ cassandra-handler/src/test/org/apache/cassandra/contrib/utils/service/CassandraServiceDataCleaner.java (revision 0)
@@ -0,0 +1,76 @@
+package org.apache.cassandra.contrib.utils.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class CassandraServiceDataCleaner {
+
+ /**
+ * Creates all data dir if they don't exist and cleans them
+ * @throws IOException
+ */
+ public void prepare() throws IOException {
+ makeDirsIfNotExist();
+ cleanupDataDirectories();
+ }
+
+ /**
+ * Deletes all data from cassandra data directories, including the commit log.
+ * @throws IOException in case of permissions error etc.
+ */
+ public void cleanupDataDirectories() throws IOException {
+ for (String s: getDataDirs()) {
+ cleanDir(s);
+ }
+ }
+ /**
+ * Creates the data diurectories, if they didn't exist.
+ * @throws IOException if directories cannot be created (permissions etc).
+ */
+ public void makeDirsIfNotExist() throws IOException {
+ for (String s: getDataDirs()) {
+ mkdir(s);
+ }
+ }
+
+ /**
+ * Collects all data dirs and returns a set of String paths on the file system.
+ *
+ * @return
+ */
+ private Set getDataDirs() {
+ Set dirs = new HashSet();
+ for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+ dirs.add(s);
+ }
+ dirs.add(DatabaseDescriptor.getCommitLogLocation());
+ return dirs;
+ }
+ /**
+ * Creates a directory
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void mkdir(String dir) throws IOException {
+ FileUtils.createDirectory(dir);
+ }
+
+ /**
+ * Removes all directory content from file the system
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void cleanDir(String dir) throws IOException {
+ File dirFile = new File(dir);
+ if (dirFile.exists() && dirFile.isDirectory()) {
+ FileUtils.delete(dirFile.listFiles());
+ }
+ }
+}
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraTestSetup.java (revision 0)
@@ -0,0 +1,86 @@
+package org.apache.hadoop.hive.cassandra;
+
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+
+import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public class CassandraTestSetup extends TestSetup{
+
+ static final Log LOG = LogFactory.getLog(CassandraTestSetup.class);
+ private EmbeddedCassandraService cassandra;
+
+ public CassandraTestSetup(Test test){
+ super(test);
+ }
+
+ @SuppressWarnings("deprecation")
+ void preTest(HiveConf conf) throws IOException, TTransportException, TException {
+ if (cassandra==null){
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+ }
+
+ FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1",9170,5000);
+ wrap.open();
+ KsDef ks = new KsDef();
+ ks.setName("Keyspace1");
+ ks.setReplication_factor(1);
+ ks.setStrategy_class("org.apache.cassandra.locator.SimpleStrategy");
+ CfDef cf = new CfDef();
+ cf.setName("Standard1");
+ cf.setKeyspace("Keyspace1");
+ ks.addToCf_defs(cf);
+ Cassandra.Client client = wrap.getClient();
+ try {
+ try {
+ KsDef exists = client.describe_keyspace("Keyspace1");
+ } catch (NotFoundException ex){
+ client.system_add_keyspace(ks);
+ System.out.println("ks added");
+ try {
+ Thread.sleep(2000);
+ } catch (Exception ex2){}
+ }
+ } catch (InvalidRequestException e){
+ throw new RuntimeException(e);
+ }
+ wrap.close();
+
+ String auxJars = conf.getAuxJars();
+ auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://"
+ + new JobConf(conf, Cassandra.Client.class).getJar();
+ auxJars += ",file://" + new JobConf(conf, StandardColumnSerDe.class).getJar();
+ auxJars += ",file://" + new JobConf(conf, org.apache.thrift.transport.TSocket.class).getJar();
+ auxJars += ",file://" + new JobConf(conf, com.google.common.collect.AbstractIterator.class).getJar();
+ auxJars += ",file://" + new JobConf(conf, org.apache.commons.lang.ArrayUtils.class).getJar();
+ conf.setAuxJars(auxJars);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ //do we need this?
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ }
+
+}
+
Index: cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java
===================================================================
--- cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0)
+++ cassandra-handler/src/test/org/apache/hadoop/hive/cassandra/CassandraQTestUtil.java (revision 0)
@@ -0,0 +1,22 @@
+package org.apache.hadoop.hive.cassandra;
+
+import org.apache.hadoop.hive.ql.QTestUtil;
+
+/**
+ * HBaseQTestUtil initializes HBase-specific test fixtures.
+ */
+public class CassandraQTestUtil extends QTestUtil {
+ public CassandraQTestUtil(
+ String outDir, String logDir, boolean miniMr, CassandraTestSetup setup)
+ throws Exception {
+
+ super(outDir, logDir, miniMr, null);
+ setup.preTest(conf);
+ super.init();
+ }
+
+ @Override
+ public void init() throws Exception {
+ // defer
+ }
+}
Index: cassandra-handler/src/test/queries/cassandra_queries.q
===================================================================
--- cassandra-handler/src/test/queries/cassandra_queries.q (revision 0)
+++ cassandra-handler/src/test/queries/cassandra_queries.q (revision 0)
@@ -0,0 +1,29 @@
+SET hive.support.concurrency=false;
+add file conf/cassandra.yaml;
+
+CREATE EXTERNAL TABLE IF NOT EXISTS
+cassandra_keyspace1_standard1(key int, value string)
+STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
+WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,Standard1:value" , "cassandra.cf.name" = "Standard1" , "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9170", "cassandra.cf.name" = "Standard1" , "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner" )
+TBLPROPERTIES ("cassandra.ks.name" = "Keyspace1");
+
+describe cassandra_keyspace1_standard1;
+
+EXPLAIN
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key;
+
+FROM src INSERT OVERWRITE TABLE cassandra_keyspace1_standard1 SELECT * WHERE (key%7)=0 ORDER BY key;
+
+EXPLAIN
+select * from cassandra_keyspace1_standard1 ORDER BY key;
+select * from cassandra_keyspace1_standard1 ORDER BY key;
+
+EXPLAIN
+select value from cassandra_keyspace1_standard1 ORDER BY VALUE;
+
+select value from cassandra_keyspace1_standard1 ORDER BY VALUE;
+
+EXPLAIN
+select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key;
+
+select a.key,a.value,b.value from cassandra_keyspace1_standard1 a JOIN cassandra_keyspace1_standard1 b on a.key=b.key ORDER BY a.key;
Index: cassandra-handler/src/test/templates/TestCassandraCliDriver.vm
===================================================================
--- cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0)
+++ cassandra-handler/src/test/templates/TestCassandraCliDriver.vm (revision 0)
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hive.cli;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.hive.cassandra.CassandraQTestUtil;
+import org.apache.hadoop.hive.cassandra.CassandraTestSetup;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.history.HiveHistoryViewer;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.antlr.runtime.*;
+import org.antlr.runtime.tree.*;
+
+public class $className extends TestCase {
+
+ private CassandraQTestUtil qt;
+ private CassandraTestSetup setup;
+
+ public $className(String name, CassandraTestSetup setup) {
+ super(name);
+ qt = null;
+ this.setup = setup;
+ }
+
+ @Override
+ protected void setUp() {
+ try {
+ boolean miniMR = false;
+ if ("$clusterMode".equals("miniMR")) {
+ miniMR = true;
+ }
+
+ qt = new CassandraQTestUtil(
+ "$resultsDir.getCanonicalPath()",
+ "$logDir.getCanonicalPath()", miniMR, setup);
+
+#foreach ($qf in $qfiles)
+ qt.addFile("$qf.getCanonicalPath()");
+#end
+ } catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception in setup");
+ }
+ }
+
+ @Override
+ protected void tearDown() {
+ try {
+ qt.shutdown();
+ }
+ catch (Exception e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception in tearDown");
+ }
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ CassandraTestSetup setup = new CassandraTestSetup(suite);
+#foreach ($qf in $qfiles)
+ #set ($fname = $qf.getName())
+ #set ($eidx = $fname.length() - 2)
+ #set ($tname = $fname.substring(0, $eidx))
+ suite.addTest(new $className("testCliDriver_$tname", setup));
+#end
+ return setup;
+ }
+
+ #foreach ($qf in $qfiles)
+ #set ($fname = $qf.getName())
+ #set ($eidx = $fname.length() - 2)
+ #set ($tname = $fname.substring(0, $eidx))
+ public void testCliDriver_$tname() throws Exception {
+ try {
+ System.out.println("Begin query: " + "$fname");
+ qt.cliInit("$fname");
+ int ecode = qt.executeClient("$fname");
+ if (ecode != 0) {
+ fail("Client Execution failed with error code = " + ecode);
+ }
+ if (SessionState.get() != null) {
+ HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+ .getHiveHistory().getHistFileName());
+ Map jobInfoMap = hv.getJobInfoMap();
+ Map taskInfoMap = hv.getTaskInfoMap();
+
+ if (jobInfoMap.size() != 0) {
+ String cmd = (String)jobInfoMap.keySet().toArray()[0];
+ QueryInfo ji = jobInfoMap.get(cmd);
+
+ if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) {
+ fail("Wrong return code in hive history");
+ }
+ }
+ }
+
+ ecode = qt.checkCliDriverResults("$fname");
+ if (ecode != 0) {
+ fail("Client execution results failed with error code = " + ecode);
+ }
+ } catch (Throwable e) {
+ System.out.println("Exception: " + e.getMessage());
+ e.printStackTrace();
+ System.out.flush();
+ fail("Unexpected exception");
+ }
+
+ System.out.println("Done query: " + "$fname");
+ assertTrue("Test passed", true);
+ }
+
+#end
+}
+
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveIColumn.java (revision 0)
@@ -0,0 +1,174 @@
+package org.apache.hadoop.hive.cassandra.input;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Collection;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.hadoop.io.Writable;
+
+public class HiveIColumn implements IColumn, Writable {
+
+ private byte [] name;
+ private byte [] value;
+ private long timestamp;
+
+ public HiveIColumn(){
+
+ }
+
+ @Override
+ public ByteBuffer name() {
+ return ByteBuffer.wrap(name);
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public ByteBuffer value() {
+ return ByteBuffer.wrap(value);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ name = new byte[in.readInt()];
+ in.readFully(name);
+
+ value= new byte[in.readInt()];
+ in.readFully(value);
+
+ timestamp = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(name.length);
+ out.write(name);
+
+ out.writeInt(value.length);
+ out.write(value);
+
+ out.writeLong(timestamp);
+ }
+
+ //bean patterns
+
+ public byte[] getName() {
+ return name;
+ }
+
+ public void setName(byte[] name) {
+ this.name = name;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("HiveIColumn[");
+ sb.append("name "+new String(this.name)+" ");
+ sb.append("value "+new String(this.value)+" ");
+ sb.append("timestamp "+ this.timestamp+" ");
+ return sb.toString();
+ }
+
+ //not needed for current integration
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addColumn(IColumn arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IColumn diff(IColumn arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getLocalDeletionTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMarkedForDeleteAt() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getString(AbstractType arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public Collection getSubColumns() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isMarkedForDelete() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long mostRecentLiveChangeAt() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int serializedSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void updateDigest(MessageDigest arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IColumn deepCopy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IColumn getSubColumn(ByteBuffer arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isLive() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IColumn reconcile(IColumn arg0) {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardRowResult.java (revision 0)
@@ -0,0 +1,66 @@
+package org.apache.hadoop.hive.cassandra.input;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+/**
+ *
+ * HiveCassandraStandardRowResult. Used as the value side of
+ * the InputFormat
+ *
+ */
+public class HiveCassandraStandardRowResult implements Writable{
+
+ private Text key;
+ private MapWritable value;
+
+ public HiveCassandraStandardRowResult() {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ key =new Text();
+ key.readFields(in);
+ value = new MapWritable();
+ value.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ key.write(out);
+ value.write(out);
+ }
+
+ public Text getKey() {
+ return key;
+ }
+
+ public void setKey(Text key) {
+ this.key = key;
+ }
+
+ public MapWritable getValue() {
+ return value;
+ }
+
+ public void setValue(MapWritable value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString(){
+ StringBuffer sb = new StringBuffer();
+ sb.append("RowResult key:"+key );
+ for (Map.Entry entry : value.entrySet()){
+ sb.append( "entry key:"+entry.getKey()+" " );
+ sb.append( "entry value:"+entry.getValue()+" " );
+ }
+ return sb.toString();
+ }
+
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/HiveCassandraStandardColumnInputFormat.java (revision 0)
@@ -0,0 +1,216 @@
+package org.apache.hadoop.hive.cassandra.input;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+@SuppressWarnings("deprecation")
+public class HiveCassandraStandardColumnInputFormat extends
+ColumnFamilyInputFormat implements InputFormat {
+
+ static final Log LOG = LogFactory.getLog(HiveCassandraStandardColumnInputFormat.class);
+ @Override
+ public RecordReader getRecordReader
+ (InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException {
+ HiveCassandraStandardSplit cassandraSplit = (HiveCassandraStandardSplit) split;
+ List columns = StandardColumnSerDe.parseColumnMapping(cassandraSplit.getColumnMapping());
+ List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+ if (columns.size() < readColIDs.size()) {
+ throw new IOException("Cannot read more columns than the given table contains.");
+ }
+ org.apache.cassandra.hadoop.ColumnFamilySplit cfSplit= cassandraSplit.getSplit();
+ Job job=new Job(jobConf);
+
+ TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(),new TaskAttemptID() ){
+ @Override
+ public void progress(){
+ reporter.progress();
+ }
+ };
+
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange();
+ range.setStart(new byte[0]);
+ range.setFinish(new byte[0]);
+ range.setReversed(false);
+ range.setCount(cassandraSplit.getSlicePredicateSize());
+ predicate.setSlice_range(range);
+
+ final org.apache.hadoop.mapreduce.RecordReader>
+ recordReader = createRecordReader(cfSplit, tac);
+
+ try {
+ ConfigHelper.setInputColumnFamily(tac.getConfiguration(),
+ cassandraSplit.getKeyspace(), cassandraSplit.getColumnFamily());
+
+ ConfigHelper.setInputSlicePredicate(tac.getConfiguration(), predicate);
+ ConfigHelper.setRangeBatchSize(tac.getConfiguration(), cassandraSplit.getRangeBatchSize());
+ ConfigHelper.setRpcPort(tac.getConfiguration(), cassandraSplit.getPort()+"" );
+ ConfigHelper.setInitialAddress(tac.getConfiguration(), cassandraSplit.getHost());
+ ConfigHelper.setPartitioner(tac.getConfiguration(),cassandraSplit.getPartitioner());
+
+ recordReader.initialize(cfSplit, tac);
+ } catch (InterruptedException ie){
+ throw new IOException(ie);
+ } catch (Exception ie){
+ throw new IOException(ie);
+ }
+ return new RecordReader(){
+
+ @Override
+ public void close() throws IOException {
+ recordReader.close();
+ }
+
+ @Override
+ public Text createKey() {
+ return new Text();
+ }
+
+ @Override
+ public HiveCassandraStandardRowResult createValue() {
+ return new HiveCassandraStandardRowResult();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0l;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ float progress = 0.0F;
+ try {
+ progress = recordReader.getProgress();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return progress;
+ }
+
+ @Override
+ public boolean next(Text rowKey, HiveCassandraStandardRowResult value) throws IOException {
+ boolean next = false;
+ try {
+ next = recordReader.nextKeyValue();
+ if (next){
+ rowKey.set( ByteBufferUtil.getArray(recordReader.getCurrentKey()) );
+ MapWritable theMap = new MapWritable();
+ for (Map.Entry entry : recordReader.getCurrentValue().entrySet()) {
+ HiveIColumn hic = new HiveIColumn();
+ hic.setName( ByteBufferUtil.getArray(entry.getValue().name()) );
+ hic.setValue( ByteBufferUtil.getArray(entry.getValue().value()) );
+ hic.setTimestamp(entry.getValue().timestamp());
+ theMap.put(new BytesWritable( ByteBufferUtil.getArray(entry.getValue().name())), hic );
+ }
+ value.setKey(rowKey);
+ value.setValue(theMap);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return next;
+ }
+ };
+
+ }
+
+ /** The Cassandra record Reader throws InteruptedException,
+ * we overlay here to throw IOException instead. */
+ @Override
+ public org.apache.hadoop.mapreduce.RecordReader>
+ createRecordReader(org.apache.hadoop.mapreduce.InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext)
+ throws IOException{
+ org.apache.hadoop.mapreduce.RecordReader> result = null;
+ try {
+ result = super.createRecordReader(inputSplit, taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (Exception ex){
+ throw new IOException(ex);
+ }
+ return result;
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+ String ks = jobConf.get(StandardColumnSerDe.CASSANDRA_KEYSPACE_NAME);
+ String cf = jobConf.get(StandardColumnSerDe.CASSANDRA_CF_NAME);
+ int slicePredicateSize = jobConf.getInt(StandardColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, 1000);
+ int sliceRangeSize = jobConf.getInt(StandardColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, 1000);
+ String cassandraColumnMapping = jobConf.get(StandardColumnSerDe.CASSANDRA_COL_MAPPING);
+ int rpcPort = jobConf.getInt(StandardColumnSerDe.CASSANDRA_PORT,9160);
+ String host = jobConf.get(StandardColumnSerDe.CASSANDRA_HOST);
+ String partitioner = jobConf.get(StandardColumnSerDe.CASSANDRA_PARTITIONER);
+
+ if (cassandraColumnMapping == null){
+ throw new IOException("cassandra.columns.mapping required for Cassandra Table.");
+ }
+
+ SliceRange range = new SliceRange();
+ range.setStart(new byte[0]);
+ range.setFinish(new byte[0]);
+ range.setReversed(false);
+ range.setCount(slicePredicateSize);
+ SlicePredicate predicate = new SlicePredicate();
+ predicate.setSlice_range(range);
+
+ ConfigHelper.setRpcPort(jobConf, ""+rpcPort);
+ ConfigHelper.setInitialAddress(jobConf, host);
+ ConfigHelper.setPartitioner(jobConf, partitioner);
+ ConfigHelper.setInputSlicePredicate(jobConf, predicate);
+ ConfigHelper.setInputColumnFamily(jobConf, ks, cf);
+ ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize);
+
+ Job job = new Job(jobConf);
+ JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
+
+ Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+ List splits = getSplits(jobContext);
+ InputSplit [] results = new InputSplit[splits.size()];
+
+ for (int i =0; i < splits.size(); ++i){
+ HiveCassandraStandardSplit csplit = new HiveCassandraStandardSplit(
+ (ColumnFamilySplit)splits.get(i), cassandraColumnMapping, tablePaths[0] );
+ csplit.setKeyspace(ks);
+ csplit.setColumnFamily(cf);
+ csplit.setRangeBatchSize(sliceRangeSize);
+ csplit.setHost(host);
+ csplit.setPort(rpcPort);
+ csplit.setSlicePredicateSize(slicePredicateSize);
+ csplit.setPartitioner(partitioner);
+ csplit.setColumnMapping(cassandraColumnMapping);
+ results[i] = csplit;
+ }
+ return results;
+ }
+
+}
Index: cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java
===================================================================
--- cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0)
+++ cassandra-handler/src/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java (revision 0)
@@ -0,0 +1,121 @@
+package org.apache.hadoop.hive.cassandra.input;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.cassandra.serde.StandardColumnSerDe;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
+public class LazyCassandraRow extends LazyStruct{
+ private List cassandraColumns;
+ private HiveCassandraStandardRowResult rowResult;
+ private ArrayList