diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 944fc22..f9d7c33 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -95,6 +95,7 @@ public class HColumnDescriptor implements WritableComparable public static final String BLOOMFILTER = "BLOOMFILTER"; public static final String FOREVER = "FOREVER"; public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE"; + public static final String REPLICATION_MASTER = "REPLICATION_MASTER"; public static final String MIN_VERSIONS = "MIN_VERSIONS"; public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS"; @@ -183,6 +184,11 @@ public class HColumnDescriptor implements WritableComparable * Default scope. */ public static final int DEFAULT_REPLICATION_SCOPE = HConstants.REPLICATION_SCOPE_LOCAL; + + /** + * Default replication Master. + */ + public static final String DEFAULT_REPLICATION_MASTER = HConstants.REPLICATION_MASTER_EMPTY; /** * Default setting for whether to evict cached blocks from the blockcache on @@ -197,6 +203,7 @@ public class HColumnDescriptor implements WritableComparable static { DEFAULT_VALUES.put(BLOOMFILTER, DEFAULT_BLOOMFILTER); DEFAULT_VALUES.put(REPLICATION_SCOPE, String.valueOf(DEFAULT_REPLICATION_SCOPE)); + DEFAULT_VALUES.put(REPLICATION_MASTER, String.valueOf(DEFAULT_REPLICATION_MASTER)); DEFAULT_VALUES.put(HConstants.VERSIONS, String.valueOf(DEFAULT_VERSIONS)); DEFAULT_VALUES.put(MIN_VERSIONS, String.valueOf(DEFAULT_MIN_VERSIONS)); DEFAULT_VALUES.put(COMPRESSION, DEFAULT_COMPRESSION); @@ -317,7 +324,8 @@ public class HColumnDescriptor implements WritableComparable final boolean blockCacheEnabled, final int timeToLive, final String bloomFilter) { this(familyName, maxVersions, compression, inMemory, blockCacheEnabled, - DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE); + DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE, + DEFAULT_REPLICATION_MASTER); } /** @@ -347,11 +355,12 @@ public class HColumnDescriptor implements WritableComparable public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final String bloomFilter, final int scope) { + final int timeToLive, final String bloomFilter, final int scope, + final String replicationMaster) { this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED, compression, DEFAULT_ENCODE_ON_DISK, DEFAULT_DATA_BLOCK_ENCODING, inMemory, blockCacheEnabled, blocksize, timeToLive, bloomFilter, - scope); + scope,replicationMaster); } /** @@ -389,7 +398,8 @@ public class HColumnDescriptor implements WritableComparable final String compression, final boolean encodeOnDisk, final String dataBlockEncoding, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final String bloomFilter, final int scope) { + final int timeToLive, final String bloomFilter, final int scope, + final String replicationMaster) { isLegalFamilyName(familyName); this.name = familyName; @@ -424,6 +434,7 @@ public class HColumnDescriptor implements WritableComparable valueOf(bloomFilter.toUpperCase())); setBlocksize(blocksize); setScope(scope); + setReplicationMaster(replicationMaster); } /** @@ -814,6 +825,26 @@ public class HColumnDescriptor implements WritableComparable return DEFAULT_REPLICATION_SCOPE; } + + /** + * @return the replication Master + */ + public String getReplicationMaster() { + String value = getValue(REPLICATION_MASTER); + if (value != null) { + return value; + } + return DEFAULT_REPLICATION_MASTER; + } + + /** + * @param the source(aka master) of a replications + * @return this (for chained invocation) + */ + public HColumnDescriptor setReplicationMaster(String masterCluster) { + return setValue(REPLICATION_MASTER, masterCluster); + } + /** * @param scope the scope tag * @return this (for chained invocation) diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index ebdd335..3e48953 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -30,10 +30,15 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; import java.io.Closeable; import java.io.IOException; import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.LinkedList; /** *

@@ -166,4 +171,54 @@ public class ReplicationAdmin implements Closeable { this.connection.close(); } } + + /** + * Find all column families that are replicated from this cluster, match them with the peers + * (target cluster) + * @param none + * @return the full list of the replicated column families of this cluster as: table name, column + * family name, target cluster name, and target cluster state + */ + public List listReplicated() throws IOException { + + List replicatedColFams = new ArrayList(); + + HTableDescriptor[] tables; + + tables = this.connection.listTables(); + + Map peers = listPeers(); + + for (HTableDescriptor table : tables) { + HColumnDescriptor[] columns = table.getColumnFamilies(); + String tableName = table.getNameAsString(); + for (HColumnDescriptor column : columns) { + String[] replicatedEntry; + int numOfItems = 5; + + if (column.getScope()!=HConstants.REPLICATION_SCOPE_LOCAL) { + //All the slave clusters + //At this moment, the columfam is replicated to all peers + for (Map.Entry peer : peers.entrySet()) { + replicatedEntry = new String[numOfItems]; + replicatedEntry[0] = tableName; + replicatedEntry[1] = column.getNameAsString(); + replicatedEntry[2] = "SLAVE "; + replicatedEntry[3] = peer.getValue(); + replicatedColFams.add(replicatedEntry); + } + } + if (!(column.getReplicationMaster().equals(HConstants.REPLICATION_MASTER_EMPTY))) { + //the master cluster + replicatedEntry = new String[numOfItems]; + replicatedEntry[0] = tableName; + replicatedEntry[1] = column.getNameAsString(); + replicatedEntry[2] = "MASTER"; + replicatedEntry[3] = column.getReplicationMaster(); + replicatedColFams.add(replicatedEntry); + } + } + } + return replicatedColFams; + } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 9d75fe7..9093345 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -524,6 +524,12 @@ public final class HConstants { */ public static final int REPLICATION_SCOPE_GLOBAL = 1; + /** + * Replication Master tag for default value. + * This data will not be replicated. + */ + public static final String REPLICATION_MASTER_EMPTY = ""; + /** * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java index 7f7138f..a2a29bc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java @@ -104,7 +104,8 @@ public class AccessControlLists { 10, // Ten is arbitrary number. Keep versions to help debugging. Compression.Algorithm.NONE.getName(), true, true, 8 * 1024, HConstants.FOREVER, BloomType.NONE.toString(), - HConstants.REPLICATION_SCOPE_LOCAL)); + HConstants.REPLICATION_SCOPE_LOCAL, + HConstants.REPLICATION_MASTER_EMPTY)); } /** diff --git hbase-server/src/main/ruby/hbase/admin.rb hbase-server/src/main/ruby/hbase/admin.rb index 30ae110..06293d5 100644 --- hbase-server/src/main/ruby/hbase/admin.rb +++ hbase-server/src/main/ruby/hbase/admin.rb @@ -610,6 +610,7 @@ module Hbase family.setBlockCacheEnabled(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKCACHE) family.setScope(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_SCOPE) + family.setReplicationMaster(arg[org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_MASTER]) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::REPLICATION_MASTER) family.setInMemory(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::IN_MEMORY) family.setTimeToLive(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::TTL))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::TTL) family.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::DATA_BLOCK_ENCODING) diff --git hbase-server/src/main/ruby/hbase/replication_admin.rb hbase-server/src/main/ruby/hbase/replication_admin.rb index 27d141a..7bff314 100644 --- hbase-server/src/main/ruby/hbase/replication_admin.rb +++ hbase-server/src/main/ruby/hbase/replication_admin.rb @@ -65,5 +65,12 @@ module Hbase def disable_peer(id) @replication_admin.disablePeer(id) end + + #---------------------------------------------------------------------------------------------- + # Show replcated tables/column families, and their target clusers + def list_replicated_tables + @replication_admin.listReplicated() + end + end end diff --git hbase-server/src/main/ruby/shell.rb hbase-server/src/main/ruby/shell.rb index b1d5cc0..464b20e 100644 --- hbase-server/src/main/ruby/shell.rb +++ hbase-server/src/main/ruby/shell.rb @@ -304,6 +304,7 @@ Shell.load_command_group( list_peers enable_peer disable_peer + list_replicated_tables ] ) diff --git hbase-server/src/main/ruby/shell/commands/list_replicated_tables.rb hbase-server/src/main/ruby/shell/commands/list_replicated_tables.rb new file mode 100644 index 0000000..e8103f7 --- /dev/null +++ hbase-server/src/main/ruby/shell/commands/list_replicated_tables.rb @@ -0,0 +1,47 @@ +#copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class ListReplicatedTables< Command + def help + return <<-EOF +List all the tables and column families replicated from this cluster + + hbase> list_replicated_tables + hbase> list_replicated_tables 'abc.*' +EOF + end + + def command(regex = ".*") + now = Time.now + + formatter.header([ "TABLE:COLUMNFAMILY", "TARGET_CLUSTER" ], [ 32 ]) + regex = /#{regex}/ unless regex.is_a?(Regexp) + list = replication_admin.list_replicated_tables + list = list.select {|s| regex.match(s[0])} + list.each do |e| + formatter.row([ e[0] + ":" + e[1], e[2]+ " = " + e[3] ], true, [ 32 ]) + end + formatter.footer(now) + end + end + end +end +