From c625df659644985774becebc19b55a5b7aab82bc Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 14 Apr 2017 17:15:15 +0800 Subject: [PATCH] HBASE-17915 Implement async replication admin methods --- .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 96 +++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 204 +++++++++++ .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 58 +-- .../client/replication/ReplicationSerDeHelper.java | 67 ++++ .../hadoop/hbase/client/TestAsyncAdminBase.java | 2 +- .../hbase/client/TestAsyncReplicationAdminApi.java | 402 +++++++++++++++++++++ 6 files changed, 772 insertions(+), 57 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index ab791c2..fc186a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -27,6 +32,10 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.replication.TableCFs; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Pair; /** @@ -465,4 +474,91 @@ public interface AsyncAdmin { * startcode. Here is an example: host187.example.com,60020,1289493121758 */ CompletableFuture move(final byte[] regionName, final byte[] destServerName); + + /** + * Add a new replication peer for replicating data to slave cluster + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication slave cluster + */ + CompletableFuture addReplicationPeer(final String peerId, + final ReplicationPeerConfig peerConfig); + + /** + * Remove a peer and stop the replication + * @param peerId a short name that identifies the peer + */ + CompletableFuture removeReplicationPeer(final String peerId); + + /** + * Restart the replication stream to the specified peer + * @param peerId a short name that identifies the peer + */ + CompletableFuture enableReplicationPeer(final String peerId); + + /** + * Stop the replication stream to the specified peer + * @param peerId a short name that identifies the peer + */ + CompletableFuture disableReplicationPeer(final String peerId); + + /** + * Returns the configured ReplicationPeerConfig for the specified peer + * @param peerId a short name that identifies the peer + * @return ReplicationPeerConfig for the peer wrapped by a {@link CompletableFuture}. + */ + CompletableFuture getReplicationPeerConfig(final String peerId); + + /** + * Update the peerConfig for the specified peer + * @param peerId a short name that identifies the peer + * @param peerConfig new config for the peer + */ + CompletableFuture updateReplicationPeerConfig(final String peerId, + final ReplicationPeerConfig peerConfig); + + /** + * Append the replicable table-cf config of the specified peer + * @param id a short that identifies the cluster + * @param tableCfs A map from tableName to column family names + */ + CompletableFuture appendReplicationPeerTableCFs(String id, + Map> tableCfs); + + /** + * Remove some table-cfs from config of the specified peer + * @param id a short name that identifies the cluster + * @param tableCfs A map from tableName to column family names + */ + CompletableFuture removeReplicationPeerTableCFs(String id, + Map> tableCfs); + + /** + * Return a list of replication peers. + * @return a list of replication peers description. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> listReplicationPeers(); + + /** + * Return a list of replication peers. + * @param regex The regular expression to match peer id + * @return a list of replication peers description. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> listReplicationPeers(String regex); + + /** + * Return a list of replication peers. + * @param pattern The compiled regular expression to match peer id + * @return a list of replication peers description. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> listReplicationPeers(Pattern pattern); + + /** + * Find all table and column families that are replicated from this cluster + * @return the replicated table-cfs list of this cluster. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> listReplicatedTableCFs(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index e42ee57..30307f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -22,16 +22,24 @@ import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -54,8 +62,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; import org.apache.hadoop.hbase.client.Scan.ReadType; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -116,6 +129,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; @@ -1149,6 +1176,175 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return future; } + @Override + public CompletableFuture addReplicationPeer(String peerId, + ReplicationPeerConfig peerConfig) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call(controller, stub, + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req, + done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture removeReplicationPeer(String peerId) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call(controller, + stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId), + (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture enableReplicationPeer(String peerId) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call(controller, + stub, RequestConverter.buildEnableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call(); + } + + @Override + public CompletableFuture disableReplicationPeer(String peerId) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call( + controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s, + c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null)) + .call(); + } + + @Override + public CompletableFuture getReplicationPeerConfig(String peerId) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call( + controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( + s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), + (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call(); + } + + @Override + public CompletableFuture updateReplicationPeerConfig(String peerId, + ReplicationPeerConfig peerConfig) { + return this + . newMasterCaller() + .action( + (controller, stub) -> this + . call( + controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, + peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), ( + resp) -> null)).call(); + } + + @Override + public CompletableFuture appendReplicationPeerTableCFs(String id, + Map> tableCfs) { + if (tableCfs == null) { + return failedFuture(new ReplicationException("tableCfs is null")); + } + + CompletableFuture future = new CompletableFuture(); + getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture removeReplicationPeerTableCFs(String id, + Map> tableCfs) { + if (tableCfs == null) { + return failedFuture(new ReplicationException("tableCfs is null")); + } + + CompletableFuture future = new CompletableFuture(); + getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + try { + ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; + } + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture> listReplicationPeers() { + return listReplicationPeers((Pattern)null); + } + + @Override + public CompletableFuture> listReplicationPeers(String regex) { + return listReplicationPeers(Pattern.compile(regex)); + } + + @Override + public CompletableFuture> listReplicationPeers(Pattern pattern) { + return this + .> newMasterCaller() + .action( + (controller, stub) -> this + .> call( + controller, + stub, + RequestConverter.buildListReplicationPeersRequest(pattern), + (s, c, req, done) -> s.listReplicationPeers(c, req, done), + (resp) -> resp.getPeerDescList().stream() + .map(ReplicationSerDeHelper::toReplicationPeerDescription) + .collect(Collectors.toList()))).call(); + } + + @Override + public CompletableFuture> listReplicatedTableCFs() { + CompletableFuture> future = new CompletableFuture>(); + listTables().whenComplete( + (tables, error) -> { + if (!completeExceptionally(future, error)) { + List replicatedTableCFs = new ArrayList<>(); + Arrays.asList(tables).forEach( + table -> { + Map cfs = new HashMap<>(); + Arrays.asList(table.getColumnFamilies()).stream() + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); + if (!cfs.isEmpty()) { + replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); + } + }); + future.complete(replicatedTableCFs); + } + }); + return future; + } + private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { if (numRegions < 3) { throw new IllegalArgumentException("Must create at least three regions"); @@ -1423,4 +1619,12 @@ public class AsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(error); return future; } + + private boolean completeExceptionally(CompletableFuture future, Throwable error) { + if (error != null) { + future.completeExceptionally(error); + return true; + } + return false; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 155a272..71449ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -3869,31 +3869,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - Map> preTableCfs = peerConfig.getTableCFsMap(); - if (preTableCfs == null) { - peerConfig.setTableCFsMap(tableCfs); - } else { - for (Map.Entry> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - Collection appendCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List cfs = preTableCfs.get(table); - if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); - } else { - Set cfSet = new HashSet(cfs); - cfSet.addAll(appendCfs); - preTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else { - if (appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); - } else { - preTableCfs.put(table, Lists.newArrayList(appendCfs)); - } - } - } - } + ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, peerConfig); } @@ -3905,37 +3881,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - Map> preTableCfs = peerConfig.getTableCFsMap(); - if (preTableCfs == null) { - throw new ReplicationException("Table-Cfs for peer" + id + " is null"); - } - for (Map.Entry> entry : tableCfs.entrySet()) { - - TableName table = entry.getKey(); - Collection removeCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List cfs = preTableCfs.get(table); - if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { - preTableCfs.remove(table); - } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { - Set cfSet = new HashSet(cfs); - cfSet.removeAll(removeCfs); - if (cfSet.isEmpty()) { - preTableCfs.remove(table); - } else { - preTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { - throw new ReplicationException("Cannot remove cf of table: " + table - + " which doesn't specify cfs from table-cfs config in peer: " + id); - } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { - throw new ReplicationException("Cannot remove table: " + table - + " which has specified cfs from table-cfs config in peer: " + id); - } - } else { - throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); - } - } + ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); updateReplicationPeerConfig(id, peerConfig); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java index 2d5539c..f561f4a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -30,11 +30,14 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.Collection; import java.util.HashSet; @@ -367,4 +370,68 @@ public final class ReplicationSerDeHelper { builder.setConfig(convert(desc.getPeerConfig())); return builder.build(); } + + public static void appendTableCFsToReplicationPeerConfig( + Map> tableCfs, ReplicationPeerConfig peerConfig) { + Map> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + peerConfig.setTableCFsMap(tableCfs); + } else { + for (Map.Entry> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection appendCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List cfs = preTableCfs.get(table); + if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + Set cfSet = new HashSet(cfs); + cfSet.addAll(appendCfs); + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + preTableCfs.put(table, Lists.newArrayList(appendCfs)); + } + } + } + } + } + + public static void removeTableCFsFromReplicationPeerConfig( + Map> tableCfs, ReplicationPeerConfig peerConfig, + String id) throws ReplicationException { + Map> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); + } + for (Map.Entry> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection removeCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List cfs = preTableCfs.get(table); + if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { + preTableCfs.remove(table); + } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { + Set cfSet = new HashSet(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + preTableCfs.remove(table); + } else { + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from table-cfs config in peer: " + id); + } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java index f0dee0a..1881d4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java @@ -34,7 +34,7 @@ import org.junit.BeforeClass; */ public abstract class TestAsyncAdminBase { - protected static final Log LOG = LogFactory.getLog(TestAdmin1.class); + protected static final Log LOG = LogFactory.getLog(TestAsyncAdminBase.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static byte[] FAMILY = Bytes.toBytes("testFamily"); protected static final byte[] FAMILY_0 = Bytes.toBytes("cf0"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java new file mode 100644 index 0000000..2b01fad --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -0,0 +1,402 @@ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Class to test asynchronous replication admin operations. + */ +@Category({MediumTests.class, ClientTests.class}) +public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { + + private final String ID_ONE = "1"; + private final String KEY_ONE = "127.0.0.1:2181:/hbase"; + private final String ID_SECOND = "2"; + private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.startMiniCluster(); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @Test + public void testAddRemovePeer() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(KEY_SECOND); + // Add a valid peer + admin.addReplicationPeer(ID_ONE, rpc1).join(); + // try adding the same (fails) + try { + admin.addReplicationPeer(ID_ONE, rpc1).join(); + fail("Test case should fail as adding a same peer."); + } catch (CompletionException e) { + // OK! + } + assertEquals(1, admin.listReplicationPeers().get().size()); + // Try to remove an inexisting peer + try { + admin.removeReplicationPeer(ID_SECOND).join(); + fail("Test case should fail as removing a inexisting peer."); + } catch (CompletionException e) { + // OK! + } + assertEquals(1, admin.listReplicationPeers().get().size()); + // Add a second since multi-slave is supported + admin.addReplicationPeer(ID_SECOND, rpc2).join(); + assertEquals(2, admin.listReplicationPeers().get().size()); + // Remove the first peer we added + admin.removeReplicationPeer(ID_ONE).join(); + assertEquals(1, admin.listReplicationPeers().get().size()); + admin.removeReplicationPeer(ID_SECOND).join(); + assertEquals(0, admin.listReplicationPeers().get().size()); + } + + @Test + public void testPeerConfig() throws Exception { + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(KEY_ONE); + config.getConfiguration().put("key1", "value1"); + config.getConfiguration().put("key2", "value2"); + admin.addReplicationPeer(ID_ONE, config).join(); + + List peers = admin.listReplicationPeers().get(); + assertEquals(1, peers.size()); + ReplicationPeerDescription peerOne = peers.get(0); + assertNotNull(peerOne); + assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); + assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testEnableDisablePeer() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + admin.addReplicationPeer(ID_ONE, rpc1).join(); + List peers = admin.listReplicationPeers().get(); + assertEquals(1, peers.size()); + assertTrue(peers.get(0).isEnabled()); + + admin.disableReplicationPeer(ID_ONE).join(); + peers = admin.listReplicationPeers().get(); + assertEquals(1, peers.size()); + assertFalse(peers.get(0).isEnabled()); + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testAppendPeerTableCFs() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); + final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); + final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); + final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); + final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); + + // Add a valid peer + admin.addReplicationPeer(ID_ONE, rpc1).join(); + + Map> tableCFs = new HashMap<>(); + + // append table t1 to replication + tableCFs.put(tableName1, null); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + Map> result = admin.getReplicationPeerConfig(ID_ONE).get() + .getTableCFsMap(); + assertEquals(1, result.size()); + assertEquals(true, result.containsKey(tableName1)); + assertNull(result.get(tableName1)); + + // append table t2 to replication + tableCFs.clear(); + tableCFs.put(tableName2, null); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(2, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertNull(result.get(tableName1)); + assertNull(result.get(tableName2)); + + // append table column family: f1 of t3 to replication + tableCFs.clear(); + tableCFs.put(tableName3, new ArrayList<>()); + tableCFs.get(tableName3).add("f1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(3, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertTrue("Should contain t3", result.containsKey(tableName3)); + assertNull(result.get(tableName1)); + assertNull(result.get(tableName2)); + assertEquals(1, result.get(tableName3).size()); + assertEquals("f1", result.get(tableName3).get(0)); + + // append table column family: f1,f2 of t4 to replication + tableCFs.clear(); + tableCFs.put(tableName4, new ArrayList<>()); + tableCFs.get(tableName4).add("f1"); + tableCFs.get(tableName4).add("f2"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(4, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertTrue("Should contain t3", result.containsKey(tableName3)); + assertTrue("Should contain t4", result.containsKey(tableName4)); + assertNull(result.get(tableName1)); + assertNull(result.get(tableName2)); + assertEquals(1, result.get(tableName3).size()); + assertEquals("f1", result.get(tableName3).get(0)); + assertEquals(2, result.get(tableName4).size()); + assertEquals("f1", result.get(tableName4).get(0)); + assertEquals("f2", result.get(tableName4).get(1)); + + // append "table5" => [], then append "table5" => ["f1"] + tableCFs.clear(); + tableCFs.put(tableName5, new ArrayList<>()); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + tableCFs.clear(); + tableCFs.put(tableName5, new ArrayList<>()); + tableCFs.get(tableName5).add("f1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(5, result.size()); + assertTrue("Should contain t5", result.containsKey(tableName5)); + // null means replication all cfs of tab5 + assertNull(result.get(tableName5)); + + // append "table6" => ["f1"], then append "table6" => [] + tableCFs.clear(); + tableCFs.put(tableName6, new ArrayList<>()); + tableCFs.get(tableName6).add("f1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + tableCFs.clear(); + tableCFs.put(tableName6, new ArrayList<>()); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(6, result.size()); + assertTrue("Should contain t6", result.containsKey(tableName6)); + // null means replication all cfs of tab6 + assertNull(result.get(tableName6)); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testRemovePeerTableCFs() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); + final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); + final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); + // Add a valid peer + admin.addReplicationPeer(ID_ONE, rpc1).join(); + Map> tableCFs = new HashMap<>(); + try { + tableCFs.put(tableName3, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); + + tableCFs.clear(); + tableCFs.put(tableName1, null); + tableCFs.put(tableName2, new ArrayList<>()); + tableCFs.get(tableName2).add("cf1"); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + try { + tableCFs.clear(); + tableCFs.put(tableName3, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + Map> result = admin.getReplicationPeerConfig(ID_ONE).get() + .getTableCFsMap(); + assertEquals(2, result.size()); + assertTrue("Should contain t1", result.containsKey(tableName1)); + assertTrue("Should contain t2", result.containsKey(tableName2)); + assertNull(result.get(tableName1)); + assertEquals(1, result.get(tableName2).size()); + assertEquals("cf1", result.get(tableName2).get(0)); + + try { + tableCFs.clear(); + tableCFs.put(tableName1, new ArrayList<>()); + tableCFs.get(tableName1).add("cf1"); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail, because table t1 didn't specify cfs in peer config"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + tableCFs.clear(); + tableCFs.put(tableName1, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); + assertEquals(1, result.size()); + assertEquals(1, result.get(tableName2).size()); + assertEquals("cf1", result.get(tableName2).get(0)); + + try { + tableCFs.clear(); + tableCFs.put(tableName2, null); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + fail("Test case should fail, because table t2 hase specified cfs in peer config"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof ReplicationException); + } + tableCFs.clear(); + tableCFs.put(tableName2, new ArrayList<>()); + tableCFs.get(tableName2).add("cf1"); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); + + tableCFs.clear(); + tableCFs.put(tableName4, new ArrayList<>()); + admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); + assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); + + admin.removeReplicationPeer(ID_ONE); + } + + @Test + public void testSetPeerNamespaces() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addReplicationPeer(ID_ONE, rpc).join(); + + // add ns1 and ns2 to peer config + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + Set namespaces = new HashSet<>(); + namespaces.add(ns1); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); + assertEquals(2, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + assertTrue(namespaces.contains(ns2)); + + // update peer config only contains ns1 + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + namespaces.clear(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); + assertEquals(1, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testNamespacesAndTableCfsConfigConflict() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); + final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + admin.addReplicationPeer(ID_ONE, rpc).join(); + + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + Set namespaces = new HashSet(); + namespaces.add(ns1); + rpc.setNamespaces(namespaces); + admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName1, new ArrayList<>()); + rpc.setTableCFsMap(tableCfs); + try { + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); + } catch (CompletionException e) { + // OK + } + + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + tableCfs.clear(); + tableCfs.put(tableName2, new ArrayList<>()); + rpc.setTableCFsMap(tableCfs); + admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + namespaces.clear(); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + try { + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); + } catch (CompletionException e) { + // OK + } + + admin.removeReplicationPeer(ID_ONE).join(); + } + + @Test + public void testPeerBandwidth() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + + admin.addReplicationPeer(ID_ONE, rpc).join(); + rpc = admin.getReplicationPeerConfig(ID_ONE).get(); + assertEquals(0, rpc.getBandwidth()); + + rpc.setBandwidth(2097152); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); + assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); + + admin.removeReplicationPeer(ID_ONE).join(); + } +} -- 1.9.1