From 2fe8921d3920aabc20ed41301ef6ecb0105ba856 Mon Sep 17 00:00:00 2001 From: Kevin Risden Date: Tue, 27 Jan 2015 11:20:42 -0600 Subject: [PATCH] HBASE-12867 Add ability to specify custom replication endpoint to add_peer --- .../src/main/ruby/hbase/replication_admin.rb | 63 ++++++++++++++++++++-- .../src/main/ruby/shell/commands/add_peer.rb | 36 +++++++++++-- 2 files changed, 90 insertions(+), 9 deletions(-) diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 6dedb2e..5ca9f9b 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -19,6 +19,11 @@ include Java +java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin +java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig +java_import org.apache.hadoop.hbase.util.Bytes +java_import org.apache.hadoop.hbase.zookeeper.ZKUtil + # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin module Hbase @@ -26,14 +31,64 @@ module Hbase include HBaseConstants def initialize(configuration, formatter) - @replication_admin = org.apache.hadoop.hbase.client.replication.ReplicationAdmin.new(configuration) + @replication_admin = ReplicationAdmin.new(configuration) + @configuration = configuration @formatter = formatter end #---------------------------------------------------------------------------------------------- # Add a new peer cluster to replicate to - def add_peer(id, cluster_key, peer_tableCFs = nil) - @replication_admin.addPeer(id, cluster_key, peer_tableCFs) + def add_peer(id, args = {}, peer_tableCFs = nil) + # make add_peer backwards compatible to take in string for clusterKey and peer_tableCFs + if args.is_a?(String) + cluster_key = args + @replication_admin.addPeer(id, cluster_key, peer_tableCFs) + elsif args.is_a?(Hash) + endpoint_classname = args.fetch(:ENDPOINT_CLASSNAME, nil) + cluster_key = args.fetch(:CLUSTER_KEY, nil) + + # Handle cases where custom replication endpoint and cluster key are either both provided + # or neither are provided + if endpoint_classname.nil? and cluster_key.nil? + raise(ArgumentError, "Either ENDPOINT_CLASSNAME or CLUSTER_KEY must be specified.") + elsif !endpoint_classname.nil? and !cluster_key.nil? + raise(ArgumentError, "ENDPOINT_CLASSNAME and CLUSTER_KEY cannot both be specified.") + end + + # Cluster Key is required for ReplicationPeerConfig for a custom replication endpoint + if !endpoint_classname.nil? and cluster_key.nil? + cluster_key = ZKUtil.getZooKeeperClusterKey(@configuration) + end + + # Optional parameters + config = args.fetch(:CONFIG, nil) + data = args.fetch(:DATA, nil) + table_cfs = args.fetch(:TABLE_CFS, nil) + + # Create and populate a ReplicationPeerConfig + replication_peer_config = ReplicationPeerConfig.new + replication_peer_config.set_cluster_key(cluster_key) + + unless endpoint_classname.nil? + replication_peer_config.set_replication_endpoint_impl(endpoint_classname) + end + + unless config.nil? + replication_peer_config.get_configuration.put_all(config) + end + + unless data.nil? + # Convert Strings to Bytes for peer_data + peer_data = replication_peer_config.get_peer_data + data.each{|key, val| + peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val)) + } + end + + @replication_admin.add_peer(id, replication_peer_config, table_cfs) + else + raise(ArgumentError, "args must be either a String or Hash") + end end #---------------------------------------------------------------------------------------------- @@ -48,7 +103,7 @@ module Hbase def list_replicated_tables(regex = ".*") pattern = java.util.regex.Pattern.compile(regex) list = @replication_admin.listReplicated() - list.select {|s| pattern.match(s.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME))} + list.select {|s| pattern.match(s.get(ReplicationAdmin.TNAME))} end #---------------------------------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index ecd8e75..c099113 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -22,21 +22,47 @@ module Shell class AddPeer< Command def help return <<-EOF -Add a peer cluster to replicate to, the id must be a short and -the cluster key is composed like this: +A peer can either be another HBase cluster or a custom replication endpoint. In either case an id +must be specified to identify the peer. + +For a HBase cluster peer, a cluster key must be provided and is composed like this: hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent -This gives a full path for HBase to connect to another cluster. +This gives a full path for HBase to connect to another HBase cluster. An optional parameter for +table column families identifies which column families will be replicated to the peer cluster. Examples: hbase> add_peer '1', "server1.cie.com:2181:/hbase" hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod" hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "tab1; tab2:cf1; tab3:cf2,cf3" + hbase> add_peer '4', :CLUSTER_KEY => "server1.cie.com:2181:/hbase" + hbase> add_peer '5', :CLUSTER_KEY => "server1.cie.com:2181:/hbase", + :TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + +For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments +are :DATA and :CONFIG which can be specified to set different either the peer_data or configuration +for the custom replication endpoint. Table column families is optional and can be specified with +the key :TABLE_CFS. + + hbase> add_peer '6', :ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint' + hbase> add_peer '7', :ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + :DATA => { "key1" => 1 } + hbase> add_peer '8', :ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + :CONFIG => { "config1" => "value1", "config2" => "value2" } + hbase> add_peer '9', :ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + :DATA => { "key1" => 1 }, :CONFIG => { "config1" => "value1", "config2" => "value2" }, + hbase> add_peer '10', :ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + :TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + hbase> add_peer '11', :ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + :DATA => { "key1" => 1 }, :CONFIG => { "config1" => "value1", "config2" => "value2" }, + :TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + +Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified but not both. EOF end - def command(id, cluster_key, peer_tableCFs = nil) + def command(id, args = {}, peer_tableCFs = nil) format_simple_command do - replication_admin.add_peer(id, cluster_key, peer_tableCFs) + replication_admin.add_peer(id, args, peer_tableCFs) end end end -- 2.2.2