From f200d963c2965fb365fe55b35e04ff9ef3bd98fa Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 8 Jan 2019 15:32:33 +0800 Subject: [PATCH] HBASE-21694 Add append_peer_exclude_tableCFs and remove_peer_exclude_tableCFs shell commands --- .../java/org/apache/hadoop/hbase/client/Admin.java | 20 +++ .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 16 +++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 12 ++ .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 24 ++++ .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 48 +++++++ .../replication/ReplicationPeerConfigUtil.java | 98 ++++++++++--- .../hbase/client/TestAsyncReplicationAdminApi.java | 157 +++++++++++++++++++++ .../client/replication/TestReplicationAdmin.java | 155 ++++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 24 ++++ hbase-shell/src/main/ruby/shell.rb | 2 + .../shell/commands/append_peer_exclude_tableCFs.rb | 42 ++++++ .../shell/commands/remove_peer_exclude_tableCFs.rb | 42 ++++++ .../src/test/ruby/hbase/replication_admin_test.rb | 70 +++++++++ 13 files changed, 691 insertions(+), 19 deletions(-) create mode 100644 hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 1d892b2..cd1e4f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2662,6 +2662,26 @@ public interface Admin extends Abortable, Closeable { throws ReplicationException, IOException; /** + * Append the non-replicable table column family config to the specified peer. + * @param id a short that identifies the cluster + * @param excludeTableCfs A map from tableName to column family names + * @throws ReplicationException if exclude tableCfs has conflict with existing config + * @throws IOException if a remote or network exception occurs + */ + void appendReplicationPeerExcludeTableCFs(String id, Map> excludeTableCfs) + throws ReplicationException, IOException; + + /** + * Remove the non-replicable table column family config from the specified peer. + * @param id a short that identifies the cluster + * @param excludeTableCfs A map from tableName to column family names + * @throws ReplicationException if exclude tableCfs has conflict with existing config + * @throws IOException if a remote or network exception occurs + */ + void removeReplicationPeerExcludeTableCFs(String id, Map> excludeTableCfs) + throws ReplicationException, IOException; + + /** * Return a list of replication peers. * @return a list of replication peers description * @throws IOException if a remote or network exception occurs diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 40ed213..5e4c8cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -647,6 +647,22 @@ public interface AsyncAdmin { Map> tableCfs); /** + * Append the non-replicable table column family config to the specified peer. + * @param id a short that identifies the cluster + * @param excludeTableCfs A map from tableName to column family names + */ + CompletableFuture appendReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs); + + /** + * Remove the non-replicable table column family config from the specified peer. + * @param id a short that identifies the cluster + * @param excludeTableCfs A map from tableName to column family names + */ + CompletableFuture removeReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs); + + /** * Return a list of replication peers. * @return a list of replication peers description. The return value will be wrapped by a * {@link CompletableFuture}. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index d8f4da5..666427a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -433,6 +433,18 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture appendReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs) { + return wrap(rawAdmin.appendReplicationPeerExcludeTableCFs(id, excludeTableCfs)); + } + + @Override + public CompletableFuture removeReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs) { + return wrap(rawAdmin.removeReplicationPeerExcludeTableCFs(id, excludeTableCfs)); + } + + @Override public CompletableFuture> listReplicationPeers() { return wrap(rawAdmin.listReplicationPeers()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 034ce03..6b85614 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4119,6 +4119,30 @@ public class HBaseAdmin implements Admin { } @Override + public void appendReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs) throws ReplicationException, IOException { + if (excludeTableCfs == null) { + throw new ReplicationException("exclude tableCfs is null"); + } + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); + ReplicationPeerConfig newPeerConfig = ReplicationPeerConfigUtil + .appendExcludeTableCFsToReplicationPeerConfig(excludeTableCfs, peerConfig); + updateReplicationPeerConfig(id, newPeerConfig); + } + + @Override + public void removeReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs) throws ReplicationException, IOException { + if (excludeTableCfs == null) { + throw new ReplicationException("exclude tableCfs is null"); + } + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); + ReplicationPeerConfig newPeerConfig = ReplicationPeerConfigUtil + .removeExcludeTableCFsFromReplicationPeerConfig(excludeTableCfs, peerConfig, id); + updateReplicationPeerConfig(id, newPeerConfig); + } + + @Override public List listReplicationPeers() throws IOException { return listReplicationPeers((Pattern)null); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 1440a64..53e1425 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1674,6 +1674,54 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture appendReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs) { + if (excludeTableCfs == null) { + return failedFuture(new ReplicationException("exclude tableCfs is null")); + } + CompletableFuture future = new CompletableFuture(); + addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + ReplicationPeerConfig newPeerConfig = ReplicationPeerConfigUtil + .appendExcludeTableCFsToReplicationPeerConfig(excludeTableCfs, peerConfig); + addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture removeReplicationPeerExcludeTableCFs(String id, + Map> excludeTableCfs) { + if (excludeTableCfs == null) { + return failedFuture(new ReplicationException("exclude tableCfs is null")); + } + CompletableFuture future = new CompletableFuture(); + addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + ReplicationPeerConfig newPeerConfig = null; + try { + newPeerConfig = ReplicationPeerConfigUtil + .removeExcludeTableCFsFromReplicationPeerConfig(excludeTableCfs, peerConfig, id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; + } + addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override public CompletableFuture> listReplicationPeers() { return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 331795c..b419e22 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -446,29 +446,49 @@ public final class ReplicationPeerConfigUtil { builder.setTableCFsMap(tableCfs); } else { Map> newTableCfs = copyTableCFsMap(preTableCfs); - for (Map.Entry> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - Collection appendCfs = entry.getValue(); - if (newTableCfs.containsKey(table)) { - List cfs = newTableCfs.get(table); - if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { - newTableCfs.put(table, null); - } else { - Set cfSet = new HashSet(cfs); - cfSet.addAll(appendCfs); - newTableCfs.put(table, Lists.newArrayList(cfSet)); - } + newTableCfs = appendTableCFs(newTableCfs, tableCfs); + builder.setTableCFsMap(newTableCfs); + } + return builder.build(); + } + + public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( + Map> excludeTableCfs, ReplicationPeerConfig peerConfig) { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); + Map> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); + if (preExcludeTableCfs == null) { + builder.setExcludeTableCFsMap(excludeTableCfs); + } else { + Map> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); + newExcludeTableCfs = appendTableCFs(newExcludeTableCfs, excludeTableCfs); + builder.setExcludeTableCFsMap(newExcludeTableCfs); + } + return builder.build(); + } + + private static Map> appendTableCFs( + Map> oldTableCfs, Map> newTableCfs) { + for (Map.Entry> entry : newTableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection appendCfs = entry.getValue(); + if (oldTableCfs.containsKey(table)) { + List cfs = oldTableCfs.get(table); + if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { + oldTableCfs.put(table, null); } else { - if (appendCfs == null || appendCfs.isEmpty()) { - newTableCfs.put(table, null); - } else { - newTableCfs.put(table, Lists.newArrayList(appendCfs)); - } + Set cfSet = new HashSet(cfs); + cfSet.addAll(appendCfs); + oldTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + oldTableCfs.put(table, null); + } else { + oldTableCfs.put(table, Lists.newArrayList(appendCfs)); } } - builder.setTableCFsMap(newTableCfs); } - return builder.build(); + return oldTableCfs; } private static Map> @@ -519,6 +539,46 @@ public final class ReplicationPeerConfigUtil { return builder.build(); } + public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( + Map> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) + throws ReplicationException { + Map> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); + if (preExcludeTableCfs == null) { + throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); + } + Map> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); + for (Map.Entry> entry : excludeTableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection removeCfs = entry.getValue(); + if (newExcludeTableCfs.containsKey(table)) { + List cfs = newExcludeTableCfs.get(table); + if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { + newExcludeTableCfs.remove(table); + } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { + Set cfSet = new HashSet(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + newExcludeTableCfs.remove(table); + } else { + newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); + } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from exclude-table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException( + "No table: " + table + " in exclude-table-cfs config of peer: " + id); + } + } + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); + builder.setExcludeTableCFsMap(newExcludeTableCfs); + return builder.build(); + } + /** * Returns the configuration needed to talk to the remote slave cluster. * @param conf the base configuration diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index b5a50c0..a222bc6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionException; + +import com.google.common.collect.Lists; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -355,6 +357,161 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { } @Test + public void testAppendExcludePeerTableCFs() throws Exception { + final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); + final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); + final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); + final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); + Map> excludeTableCFs; + Map> appendTableCFs = new HashMap<>(); + + // Add a valid peer + ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); + admin.addReplicationPeer(ID_ONE, rpc1).join(); + + // Append table t1 to exclude-table-cfs + appendTableCFs.put(tableName1, null); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs).join(); + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(1, excludeTableCFs.size()); + assertEquals(true, excludeTableCFs.containsKey(tableName1)); + assertNull(excludeTableCFs.get(tableName1)); + + // Append table column family: f1,f2 of t2 to exclude-table-cfs + appendTableCFs.clear(); + appendTableCFs.put(tableName2, Lists.newArrayList("f1", "f2")); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs).join(); + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertTrue("Should contain t1", excludeTableCFs.containsKey(tableName1)); + assertTrue("Should contain t2", excludeTableCFs.containsKey(tableName2)); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(2, excludeTableCFs.get(tableName2).size()); + assertEquals("f1", excludeTableCFs.get(tableName2).get(0)); + assertEquals("f2", excludeTableCFs.get(tableName2).get(1)); + + // Append "t3" => [], then append "t3" => ["f1"] + appendTableCFs.clear(); + appendTableCFs.put(tableName3, new ArrayList<>()); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs).join(); + appendTableCFs.clear(); + appendTableCFs.put(tableName3, Lists.newArrayList("f1")); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs).join(); + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(3, excludeTableCFs.size()); + assertTrue("Should contain t3", excludeTableCFs.containsKey(tableName3)); + assertNull(excludeTableCFs.get(tableName3)); + + // append "t4" => ["f1"], then append "t4" => [] + appendTableCFs.clear(); + appendTableCFs.put(tableName4, new ArrayList<>()); + appendTableCFs.get(tableName4).add("f1"); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs).join(); + appendTableCFs.clear(); + appendTableCFs.put(tableName4, new ArrayList<>()); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs).join(); + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(4, excludeTableCFs.size()); + assertTrue("Should contain t4", excludeTableCFs.containsKey(tableName4)); + assertNull(excludeTableCFs.get(tableName4)); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testRemoveExcludePeerTableCFs() throws Exception { + final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); + final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); + final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); + Map> excludeTableCFs = new HashMap<>(); + Map> removeTableCFs = new HashMap<>(); + + // Add a valid peer + ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); + admin.addReplicationPeer(ID_ONE, rpc1).join(); + + // Remove exclude-table when peer' exclude-table is empty + removeTableCFs.put(tableName3, null); + try { + admin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap()); + + // Append exclude-table-cfs + excludeTableCFs.clear(); + excludeTableCFs.put(tableName1, null); + excludeTableCFs.put(tableName2, Lists.newArrayList("cf1")); + admin.appendReplicationPeerExcludeTableCFs(ID_ONE, excludeTableCFs).join(); + + // Remove exclude-table which not exist in peer' exclude-table + removeTableCFs.clear(); + removeTableCFs.put(tableName3, null); + try { + admin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertTrue("Should contain t1", excludeTableCFs.containsKey(tableName1)); + assertTrue("Should contain t2", excludeTableCFs.containsKey(tableName2)); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // Remove exclude-table-cfs which peer' exclude-cfs is empty + removeTableCFs.clear(); + removeTableCFs.put(tableName1, Lists.newArrayList("cf2")); + try { + admin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertTrue("Should contain t1", excludeTableCFs.containsKey(tableName1)); + assertTrue("Should contain t2", excludeTableCFs.containsKey(tableName2)); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // remove exclude-table which peer' exclude-cfs is not empty + removeTableCFs.clear(); + removeTableCFs.put(tableName2, null); + try { + admin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertTrue("Should contain t1", excludeTableCFs.containsKey(tableName1)); + assertTrue("Should contain t2", excludeTableCFs.containsKey(tableName2)); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // Remove exclude-cfs which cfs is more than peers' exclude-cfs + removeTableCFs.clear(); + removeTableCFs.put(tableName2, Lists.newArrayList("cf1", "cf2")); + admin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs).join(); + excludeTableCFs = admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap(); + assertEquals(1, excludeTableCFs.size()); + assertTrue("Should contain t1", excludeTableCFs.containsKey(tableName1)); + assertNull(excludeTableCFs.get(tableName1)); + + // remove exclude-table + removeTableCFs.clear(); + removeTableCFs.put(tableName1, null); + admin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs).join(); + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getExcludeTableCFsMap()); + + admin.removeReplicationPeer(ID_ONE); + } + + @Test public void testSetPeerNamespaces() throws Exception { String ns1 = "ns1"; String ns2 = "ns2"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 6462234..1296524 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; + +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -594,6 +596,159 @@ public class TestReplicationAdmin { } @Test + public void testAppendExcludePeerTableCFs() throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); + + Map> excludeTableCFs; + Map> appendTableCFs = new HashMap<>(); + + // Add a valid peer which replicates all tables + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + // append null + try { + hbaseAdmin.appendReplicationPeerExcludeTableCFs(ID_ONE, null); + } catch (ReplicationException e) { + // expected + } + assertNull(hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap()); + + // append exclude-table-cfs + appendTableCFs.clear(); + appendTableCFs.put(tableName1, null); + appendTableCFs.put(tableName2, Lists.newArrayList("cf1")); + hbaseAdmin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // append exclude-cfs which peer' exclude-cfs is empty + appendTableCFs.clear(); + appendTableCFs.put(tableName1, Lists.newArrayList("cf3")); + hbaseAdmin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // append exclude-cfs which peer' exclude-cfs is not empty + appendTableCFs.clear(); + appendTableCFs.put(tableName2, Lists.newArrayList("cf2")); + hbaseAdmin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(2, excludeTableCFs.get(tableName2).size()); + + // append exclude-table which peer' exclude-cfs is not empty + appendTableCFs.clear(); + appendTableCFs.put(tableName2, new ArrayList<>()); + hbaseAdmin.appendReplicationPeerExcludeTableCFs(ID_ONE, appendTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + assertNull(excludeTableCFs.get(tableName2)); + + hbaseAdmin.removeReplicationPeer(ID_ONE); + } + + @Test + public void testRemoveExcludePeerTableCFs() throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); + final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); + + Map> excludeTableCFs = new HashMap<>(); + Map> removeTableCFs = new HashMap<>(); + + // Add a valid peer which replicates all tables + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + // remove exclude-table when peer' exclude-table is empty + try { + removeTableCFs.put(tableName1, null); + hbaseAdmin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs); + } catch (ReplicationException e) { + // expected + } + assertNull(hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap()); + + // append exclude-table-cfs + excludeTableCFs.clear(); + excludeTableCFs.put(tableName1, null); + excludeTableCFs.put(tableName2, Lists.newArrayList("cf1")); + hbaseAdmin.appendReplicationPeerExcludeTableCFs(ID_ONE, excludeTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // remove exclude-table which not exist in peer' exclude-table + removeTableCFs.clear(); + removeTableCFs.put(tableName3, null); + try { + hbaseAdmin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs); + } catch (ReplicationException e) { + // expected + } + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // remove exclude-table-cfs which peer' exclude-cfs is empty + removeTableCFs.clear(); + removeTableCFs.put(tableName1, Lists.newArrayList("cf2")); + try { + hbaseAdmin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs); + } catch (ReplicationException e) { + // expected + } + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // remove exclude-table which peer' exclude-cfs is not empty + removeTableCFs.clear(); + removeTableCFs.put(tableName2, null); + try { + hbaseAdmin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs); + } catch (ReplicationException e) { + // expected + } + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, excludeTableCFs.size()); + assertEquals(1, excludeTableCFs.get(tableName2).size()); + assertEquals("cf1", excludeTableCFs.get(tableName2).get(0)); + + // remove exclude-cfs which cfs is more than peers' exclude-cfs + removeTableCFs.clear(); + removeTableCFs.put(tableName2, Lists.newArrayList("cf1", "cf2")); + hbaseAdmin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(1, excludeTableCFs.size()); + assertNull(excludeTableCFs.get(tableName1)); + + // remove exclude-table + removeTableCFs.clear(); + removeTableCFs.put(tableName1, null); + hbaseAdmin.removeReplicationPeerExcludeTableCFs(ID_ONE, removeTableCFs); + excludeTableCFs = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertNull(excludeTableCFs); + + hbaseAdmin.removeReplicationPeer(ID_ONE); + } + + @Test public void testSetPeerNamespaces() throws Exception { String ns1 = "ns1"; String ns2 = "ns2"; diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index e061168..3c32e14 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -210,6 +210,30 @@ module Hbase @admin.removeReplicationPeerTableCFs(id, map) end + # Append exclude-tableCFs to the exclude-tableCFs config for the specified peer + def append_peer_exclude_tableCFs(id, excludeTableCFs) + unless excludeTableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + excludeTableCFs.each do |key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + end + end + @admin.appendReplicationPeerExcludeTableCFs(id, map) + end + + # Remove some exclude-tableCFs from the exclude-tableCFs config for the specified peer + def remove_peer_exclude_tableCFs(id, excludeTableCFs) + unless excludeTableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + excludeTableCFs.each do |key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + end + end + @admin.removeReplicationPeerExcludeTableCFs(id, map) + end + # Set new namespaces config for the specified peer def set_peer_namespaces(id, namespaces) unless namespaces.nil? diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 62a8bae..6bd4804 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -389,6 +389,8 @@ Shell.load_command_group( show_peer_tableCFs set_peer_tableCFs set_peer_exclude_tableCFs + append_peer_exclude_tableCFs + remove_peer_exclude_tableCFs set_peer_bandwidth list_replicated_tables append_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb new file mode 100644 index 0000000..c229c94 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb @@ -0,0 +1,42 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class AppendPeerExcludeTableCFs < Command + def help + <<-EOF +Append table-cfs config to the specified peer' exclude table-cfs to make them non-replicable +Examples: + + # append tables / table-cfs to peers' exclude table-cfs + hbase> append_peer_exclude_tableCFs '2', { "table1" => [], "ns2:table2" => ["cfA", "cfB"]} + EOF + end + + def command(id, table_cfs) + replication_admin.append_peer_exclude_tableCFs(id, table_cfs) + end + + def command_name + 'append_peer_exclude_tableCFs' + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb new file mode 100644 index 0000000..4f45207 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb @@ -0,0 +1,42 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class RemovePeerExcludeTableCFs < Command + def help + <<-EOF +Remove table-cfs config from the specified peer' exclude table-cfs to make them replicable +Examples: + + # remove tables / table-cfs from peer' exclude table-cfs + hbase> remove_peer_exclude_tableCFs '2', { "table1" => [], "ns2:table2" => ["cfA", "cfB"]} + EOF + end + + def command(id, table_cfs) + replication_admin.remove_peer_exclude_tableCFs(id, table_cfs) + end + + def command_name + 'remove_peer_exclude_tableCFs' + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 85b4537..7f5400f 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -361,6 +361,76 @@ module Hbase replication_admin.remove_peer(@peer_id) end + define_test "append_peer_exclude_tableCFs: works with exclude table-cfs map" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + args = {CLUSTER_KEY => cluster_key} + command(:add_peer, @peer_id, args) + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + + # set exclude-table-cfs + exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2"]} + command(:set_peer_exclude_tableCFs, @peer_id, exclude_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # append empty exclude-table-cfs + append_table_cfs = {} + command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # append exclude-table-cfs which don't exist in peer' exclude-table-cfs + append_table_cfs = {"table3" => ["cf3"]} + exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2"], "table3" => ["cf3"]} + command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # append exclude-table-cfs which exist in peer' exclude-table-cfs + append_table_cfs = {"table1" => ["cf1"], "ns2:table2" => ["cf1", "cf3"], "table3" => []} + exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2", "cf3"], "table3" => []} + command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + + define_test 'remove_peer_exclude_tableCFs: works with exclude table-cfs map' do + cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' + args = {CLUSTER_KEY => cluster_key} + command(:add_peer, @peer_id, args) + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + + # set exclude-table-cfs + exclude_table_cfs = {'table1' => [], 'table2' => ['cf1'], 'ns3:table3' => ['cf1', 'cf2']} + command(:set_peer_exclude_tableCFs, @peer_id, exclude_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # remove empty exclude-table-cfs + remove_table_cfs = {} + command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # remove exclude-table-cfs which exist in pees' exclude table cfs + remove_table_cfs = {'table1' => [], 'table2' => ['cf1']} + exclude_table_cfs = {'ns3:table3' => ['cf1', 'cf2']} + command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # remove exclude-table-cfs which exist in pees' exclude-table-cfs + remove_table_cfs = {'ns3:table3' => ['cf2', 'cf3']} + exclude_table_cfs = {'ns3:table3' => ['cf1']} + command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + define_test "set_peer_namespaces: works with namespaces array" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" namespaces = ["ns1", "ns2"] -- 2.7.4