diff --git a/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java b/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java index 5b4b53d..b6c6cf2 100644 --- a/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java +++ b/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java @@ -96,7 +96,7 @@ public class AccessControlLists { 10, // Ten is arbitrary number. Keep versions to help debugging. Compression.Algorithm.NONE.getName(), true, true, 8 * 1024, HConstants.FOREVER, StoreFile.BloomType.NONE.toString(), - HConstants.REPLICATION_SCOPE_LOCAL)); + HConstants.REPLICATION_SCOPE_LOCAL,HConstants.REPLICATION_MASTER_EMPTY)); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index dca442d..b8a9834 100644 --- a/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -84,6 +84,7 @@ public class HColumnDescriptor implements WritableComparable public static final String BLOOMFILTER = "BLOOMFILTER"; public static final String FOREVER = "FOREVER"; public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE"; + public static final String REPLICATION_MASTER = "REPLICATION_MASTER"; public static final String MIN_VERSIONS = "MIN_VERSIONS"; public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS"; @@ -174,6 +175,12 @@ public class HColumnDescriptor implements WritableComparable public static final int DEFAULT_REPLICATION_SCOPE = HConstants.REPLICATION_SCOPE_LOCAL; /** + * Default replication Master. + */ + public static final String DEFAULT_REPLICATION_MASTER = HConstants.REPLICATION_MASTER_EMPTY; + + + /** * Default setting for whether to evict cached blocks from the blockcache on * close. */ @@ -185,6 +192,7 @@ public class HColumnDescriptor implements WritableComparable static { DEFAULT_VALUES.put(BLOOMFILTER, DEFAULT_BLOOMFILTER); DEFAULT_VALUES.put(REPLICATION_SCOPE, String.valueOf(DEFAULT_REPLICATION_SCOPE)); + DEFAULT_VALUES.put(REPLICATION_MASTER, String.valueOf(DEFAULT_REPLICATION_MASTER)); DEFAULT_VALUES.put(HConstants.VERSIONS, String.valueOf(DEFAULT_VERSIONS)); DEFAULT_VALUES.put(MIN_VERSIONS, String.valueOf(DEFAULT_MIN_VERSIONS)); DEFAULT_VALUES.put(COMPRESSION, DEFAULT_COMPRESSION); @@ -295,7 +303,7 @@ public class HColumnDescriptor implements WritableComparable final boolean blockCacheEnabled, final int timeToLive, final String bloomFilter) { this(familyName, maxVersions, compression, inMemory, blockCacheEnabled, - DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE); + DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE,DEFAULT_REPLICATION_MASTER); } /** @@ -325,11 +333,12 @@ public class HColumnDescriptor implements WritableComparable public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final String bloomFilter, final int scope) { + final int timeToLive, final String bloomFilter, final int scope, + final String replicationMaster) { this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED, compression, DEFAULT_ENCODE_ON_DISK, DEFAULT_DATA_BLOCK_ENCODING, inMemory, blockCacheEnabled, blocksize, timeToLive, bloomFilter, - scope); + scope,replicationMaster); } /** @@ -367,7 +376,7 @@ public class HColumnDescriptor implements WritableComparable final String compression, final boolean encodeOnDisk, final String dataBlockEncoding, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final String bloomFilter, final int scope) { + final int timeToLive, final String bloomFilter, final int scope, final String replicationMaster) { isLegalFamilyName(familyName); this.name = familyName; @@ -402,6 +411,7 @@ public class HColumnDescriptor implements WritableComparable valueOf(bloomFilter.toUpperCase())); setBlocksize(blocksize); setScope(scope); + setReplicationMaster(replicationMaster); } /** @@ -776,6 +786,17 @@ public class HColumnDescriptor implements WritableComparable } return DEFAULT_REPLICATION_SCOPE; } + + /** + * @return the replication Master + */ + public String getReplicationMaster() { + String value = getValue(REPLICATION_MASTER); + if (value != null) { + return value; + } + return DEFAULT_REPLICATION_MASTER; + } /** * @param scope the scope tag @@ -786,6 +807,14 @@ public class HColumnDescriptor implements WritableComparable } /** + * @param the source(aka master) of a replications + * @return this (for chained invocation) + */ + public HColumnDescriptor setReplicationMaster(String masterCluster) { + return setValue(REPLICATION_MASTER, masterCluster); + } + + /** * @return true if we should cache data blocks on write */ public boolean shouldCacheDataOnWrite() { diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index ac91454..1d3f439 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -441,6 +441,12 @@ public final class HConstants { public static final int REPLICATION_SCOPE_LOCAL = 0; /** + * Replication Master tag for default value. + * This data will not be replicated. + */ + public static final String REPLICATION_MASTER_EMPTY = ""; + + /** * Scope tag for globally scoped data. * This data will be replicated to all peers. */ diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index b4efc3f..a44eae8 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -22,6 +22,9 @@ package org.apache.hadoop.hbase.client.replication; import java.io.Closeable; import java.io.IOException; import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.LinkedList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -30,6 +33,8 @@ import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; /** *

@@ -199,4 +204,56 @@ public class ReplicationAdmin implements Closeable { this.connection.close(); } } + + /** + * Find all column families that are replicated from this cluster, match them with the peers + * (target cluster) + * @param none + * @return the full list of the replicated column families of this cluster as: table name, column + * family name, target cluster name, and target cluster state + */ + public List listReplicated() throws IOException { + + List replicatedColFams = new ArrayList(); + + HTableDescriptor[] tables; + + tables = this.connection.listTables(); + + Map peers = listPeers(); + + for (HTableDescriptor table : tables) { + HColumnDescriptor[] columns = table.getColumnFamilies(); + String tableName = table.getNameAsString(); + for (HColumnDescriptor column : columns) { + String[] replicatedEntry; + int numOfItems = 5; + + if (column.getScope()!=HConstants.REPLICATION_SCOPE_LOCAL) { + //All the slave clusters + //At this moment, the columfam is replicated to all peers + for (Map.Entry peer : peers.entrySet()) { + replicatedEntry = new String[numOfItems]; + replicatedEntry[0] = tableName; + replicatedEntry[1] = column.getNameAsString(); + replicatedEntry[2] = "SLAVE "; + replicatedEntry[3] = peer.getValue(); + replicatedEntry[4] = getPeerState(peer.getKey()); + replicatedColFams.add(replicatedEntry); + } + } + if (!(column.getReplicationMaster().equals(HConstants.REPLICATION_MASTER_EMPTY))) { + //the master cluster + replicatedEntry = new String[numOfItems]; + replicatedEntry[0] = tableName; + replicatedEntry[1] = column.getNameAsString(); + replicatedEntry[2] = "MASTER"; + replicatedEntry[3] = column.getReplicationMaster(); + replicatedEntry[4] = ""; + replicatedColFams.add(replicatedEntry); + } + } + } + return replicatedColFams; + } } diff --git a/src/main/ruby/hbase/admin.rb b/src/main/ruby/hbase/admin.rb index a40a37d..ea29742 100644 --- a/src/main/ruby/hbase/admin.rb +++ b/src/main/ruby/hbase/admin.rb @@ -573,6 +573,7 @@ module Hbase family.setBlockCacheEnabled(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE) family.setScope(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE) + family.setReplicationMaster(arg[org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_MASTER]) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_MASTER) family.setInMemory(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY) family.setTimeToLive(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::TTL])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL) family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING) diff --git a/src/main/ruby/hbase/replication_admin.rb b/src/main/ruby/hbase/replication_admin.rb index f694f5f..a5f48a8 100644 --- a/src/main/ruby/hbase/replication_admin.rb +++ b/src/main/ruby/hbase/replication_admin.rb @@ -78,5 +78,12 @@ module Hbase def stop_replication @replication_admin.setReplicating(false) end + + #---------------------------------------------------------------------------------------------- + # Show replcated tables/column families, and their target clusers + def list_replicated_tables + @replication_admin.listReplicated() + end + end end diff --git a/src/main/ruby/shell.rb b/src/main/ruby/shell.rb index d1ec550..1330b7e 100644 --- a/src/main/ruby/shell.rb +++ b/src/main/ruby/shell.rb @@ -286,6 +286,7 @@ Shell.load_command_group( disable_peer start_replication stop_replication + list_replicated_tables ] ) diff --git a/src/main/ruby/shell/commands/list_replicated_tables.rb b/src/main/ruby/shell/commands/list_replicated_tables.rb new file mode 100644 index 0000000..7128589 --- /dev/null +++ b/src/main/ruby/shell/commands/list_replicated_tables.rb @@ -0,0 +1,51 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class ListReplicatedTables< Command + def help + return <<-EOF +List all the tables and column families replicated from this cluster + + hbase> list_replicated_tables + hbase> list_replicated_tables 'abc.*' +EOF + end + + def command(regex = ".*") + now = Time.now + + formatter.header([ "TABLE:COLUMNFAMILY", "TARGET_CLUSTER + STATE" ], [ 32 ]) + regex = /#{regex}/ unless regex.is_a?(Regexp) + list = replication_admin.list_replicated_tables + list = list.select {|s| regex.match(s[0])} + list.each do |e| + if e[4]=='' + formatter.row([ e[0] + ":" + e[1], e[2]+ " = " + e[3] ], true, [ 32 ]) + else + formatter.row([ e[0] + ":" + e[1], e[2]+ " = " + e[3] + " + " + e[4]], true, [ 32 ]) + end + end + formatter.footer(now) + end + end + end +end