From 43e10bbd5d64db2cc0b82f278f765fab9999e6bb Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sun, 15 Oct 2017 23:13:12 +0800 Subject: [PATCH] HBASE-19009 implement enable/disableTableReplication for AsyncAdmin --- .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 18 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 17 +- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 193 ++------------- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 274 +++++++++++++++++++++ .../client/replication/ReplicationSerDeHelper.java | 173 ++++++++++++- .../hbase/client/TestAsyncReplicationAdminApi.java | 2 - .../TestAsyncReplicationAdminApiWithClusters.java | 240 ++++++++++++++++++ 7 files changed, 739 insertions(+), 178 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index baae6cf..53fca84 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -144,6 +144,12 @@ public interface AsyncAdmin { */ CompletableFuture createTable(TableDescriptor desc, byte[][] splitKeys); + /* + * Modify an existing table, more IRB friendly version. + * @param desc modified description of the table + */ + CompletableFuture modifyTable(TableDescriptor desc); + /** * Deletes a table. * @param tableName name of table to delete @@ -556,6 +562,18 @@ public interface AsyncAdmin { CompletableFuture> listReplicatedTableCFs(); /** + * Enable a table's replication switch. + * @param tableName name of the table + */ + CompletableFuture enableTableReplication(TableName tableName); + + /** + * Disable a table's replication switch. + * @param tableName name of the table + */ + CompletableFuture disableTableReplication(TableName tableName); + + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique * based on the name of the snapshot. Attempts to take a snapshot with the same name (even diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 04005eb..6b56fae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -126,6 +126,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture modifyTable(TableDescriptor desc) { + return wrap(rawAdmin.modifyTable(desc)); + } + + @Override public CompletableFuture deleteTable(TableName tableName) { return wrap(rawAdmin.deleteTable(tableName)); } @@ -418,6 +423,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture enableTableReplication(TableName tableName) { + return wrap(rawAdmin.enableTableReplication(tableName)); + } + + @Override + public CompletableFuture disableTableReplication(TableName tableName) { + return wrap(rawAdmin.disableTableReplication(tableName)); + } + + @Override public CompletableFuture snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); } @@ -707,4 +722,4 @@ public class AsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture> clearDeadServers(List servers) { return wrap(rawAdmin.clearDeadServers(servers)); } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 556e564..99bc97e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4098,84 +4098,13 @@ public class HBaseAdmin implements Admin { throw new IllegalArgumentException("Table name is null"); } if (!tableExists(tableName)) { - throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() + throw new TableNotFoundException("Table '" + tableName.getNameAsString() + "' does not exists."); } setTableRep(tableName, false); } /** - * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method - * ensures that the name of table and column-families should match. - * @param peerHtd descriptor on peer cluster - * @param localHtd - The HTableDescriptor of table from source cluster. - * @return true If the name of table and column families match and REPLICATION_SCOPE copied - * successfully. false If there is any mismatch in the names. - */ - private boolean copyReplicationScope(final HTableDescriptor peerHtd, - final HTableDescriptor localHtd) { - // Copy the REPLICATION_SCOPE only when table names and the names of - // Column-Families are same. - int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); - - if (result == 0) { - Iterator remoteHCDIter = peerHtd.getFamilies().iterator(); - Iterator localHCDIter = localHtd.getFamilies().iterator(); - - while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { - HColumnDescriptor remoteHCD = remoteHCDIter.next(); - HColumnDescriptor localHCD = localHCDIter.next(); - - String remoteHCDName = remoteHCD.getNameAsString(); - String localHCDName = localHCD.getNameAsString(); - - if (remoteHCDName.equals(localHCDName)) { - remoteHCD.setScope(localHCD.getScope()); - } else { - result = -1; - break; - } - } - if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { - return false; - } - } - - return result == 0; - } - - /** - * Compare the contents of the descriptor with another one passed as a parameter for replication - * purpose. The REPLICATION_SCOPE field is ignored during comparison. - * @param peerHtd descriptor on peer cluster - * @param localHtd descriptor on source cluster which needs to be replicated. - * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). - * @see java.lang.Object#equals(java.lang.Object) - */ - private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { - if (peerHtd == localHtd) { - return true; - } - if (peerHtd == null) { - return false; - } - boolean result = false; - - // Create a copy of peer HTD as we need to change its replication - // scope to match with the local HTD. - HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); - - result = copyReplicationScope(peerHtdCopy, localHtd); - - // If copy was successful, compare the two tables now. - if (result) { - result = (peerHtdCopy.compareTo(localHtd) == 0); - } - - return result; - } - - /** * Connect to peer and check the table descriptor on peer: *
    *
  1. Create the same table on peer when not exist.
  2. @@ -4195,21 +4124,22 @@ public class HBaseAdmin implements Admin { } for (ReplicationPeerDescription peerDesc : peers) { - if (needToReplicate(tableName, peerDesc)) { - Configuration peerConf = getPeerClusterConfiguration(peerDesc); + if (ReplicationSerDeHelper.needToReplicate(tableName, peerDesc)) { + Configuration peerConf = + ReplicationSerDeHelper.getPeerClusterConfiguration(this.conf, peerDesc); try (Connection conn = ConnectionFactory.createConnection(peerConf); Admin repHBaseAdmin = conn.getAdmin()) { - HTableDescriptor localHtd = getTableDescriptor(tableName); - HTableDescriptor peerHtd = null; + TableDescriptor tableDesc = getDescriptor(tableName); + TableDescriptor peerTableDesc = null; if (!repHBaseAdmin.tableExists(tableName)) { - repHBaseAdmin.createTable(localHtd, splits); + repHBaseAdmin.createTable(tableDesc, splits); } else { - peerHtd = repHBaseAdmin.getTableDescriptor(tableName); - if (peerHtd == null) { + peerTableDesc = repHBaseAdmin.getDescriptor(tableName); + if (peerTableDesc == null) { throw new IllegalArgumentException("Failed to get table descriptor for table " + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); } - if (!compareForReplication(peerHtd, localHtd)) { + if (!ReplicationSerDeHelper.compareIgnoreReplication(peerTableDesc, tableDesc)) { throw new IllegalArgumentException("Table " + tableName.getNameAsString() + " exists in peer cluster " + peerDesc.getPeerId() + ", but the table descriptors are not same when compared with source cluster." @@ -4222,108 +4152,25 @@ public class HBaseAdmin implements Admin { } /** - * Decide whether the table need replicate to the peer cluster according to the peer config - * @param table name of the table - * @param peer config for the peer - * @return true if the table need replicate to the peer cluster - */ - private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); - Set namespaces = peerConfig.getNamespaces(); - Map> tableCFsMap = peerConfig.getTableCFsMap(); - // If null means user has explicitly not configured any namespaces and table CFs - // so all the tables data are applicable for replication - if (namespaces == null && tableCFsMap == null) { - return true; - } - if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { - return true; - } - if (tableCFsMap != null && tableCFsMap.containsKey(table)) { - return true; - } - LOG.debug("Table " + table.getNameAsString() - + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" - + peerConfig.getClusterKey()); - return false; - } - - /** * Set the table's replication switch if the table's replication switch is already not set. * @param tableName name of the table * @param enableRep is replication switch enable or disable * @throws IOException if a remote or network exception occurs */ private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { - HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName)); - ReplicationState currentReplicationState = getTableReplicationState(htd); - if (enableRep && currentReplicationState != ReplicationState.ENABLED - || !enableRep && currentReplicationState != ReplicationState.DISABLED) { - for (HColumnDescriptor hcd : htd.getFamilies()) { - hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL - : HConstants.REPLICATION_SCOPE_LOCAL); - } - modifyTable(tableName, htd); + TableDescriptor tableDesc = getDescriptor(tableName); + ReplicationSerDeHelper.ReplicationState currentReplicationState = + ReplicationSerDeHelper.getTableReplicationState(tableDesc); + if (enableRep && currentReplicationState != ReplicationSerDeHelper.ReplicationState.ENABLED + || !enableRep + && currentReplicationState != ReplicationSerDeHelper.ReplicationState.DISABLED) { + TableDescriptor newTableDesc = + ReplicationSerDeHelper.buildTableDescriptorWithReplicationScope(tableDesc, + enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL); + modifyTable(newTableDesc); } } - /** - * This enum indicates the current state of the replication for a given table. - */ - private enum ReplicationState { - ENABLED, // all column families enabled - MIXED, // some column families enabled, some disabled - DISABLED // all column families disabled - } - - /** - * @param htd table descriptor details for the table to check - * @return ReplicationState the current state of the table. - */ - private ReplicationState getTableReplicationState(HTableDescriptor htd) { - boolean hasEnabled = false; - boolean hasDisabled = false; - - for (HColumnDescriptor hcd : htd.getFamilies()) { - if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL - && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { - hasDisabled = true; - } else { - hasEnabled = true; - } - } - - if (hasEnabled && hasDisabled) return ReplicationState.MIXED; - if (hasEnabled) return ReplicationState.ENABLED; - return ReplicationState.DISABLED; - } - - /** - * Returns the configuration needed to talk to the remote slave cluster. - * @param peer the description of replication peer - * @return the configuration for the peer cluster, null if it was unable to get the configuration - * @throws IOException - */ - private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) - throws IOException { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); - Configuration otherConf; - try { - otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); - } catch (IOException e) { - throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); - } - - if (!peerConfig.getConfiguration().isEmpty()) { - CompoundConfiguration compound = new CompoundConfiguration(); - compound.add(otherConf); - compound.addStringMap(peerConfig.getConfiguration()); - return compound; - } - - return otherConf; - } - @Override public void clearCompactionQueues(final ServerName sn, final Set queues) throws IOException, InterruptedException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 1d80797..b4b87cd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -42,8 +43,11 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ClusterStatus.Option; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -187,6 +191,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; @@ -493,6 +499,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture modifyTable(TableDescriptor desc) { + return this. procedureCall( + RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), + (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); + } + + @Override public CompletableFuture deleteTable(TableName tableName) { return this. procedureCall(RequestConverter .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), @@ -2322,6 +2336,18 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } } + private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { + + ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(admin, tableName); + } + + @Override + String getOperationType() { + return "ENABLE"; + } + } + private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { DeleteTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { @@ -3030,4 +3056,252 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt); } + + @Override + public CompletableFuture enableTableReplication(TableName tableName) { + if (tableName == null) { + return failedFuture(new IllegalArgumentException("Table name is null")); + } + CompletableFuture future = new CompletableFuture<>(); + tableExists(tableName).whenComplete( + (exist, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException("Table '" + + tableName.getNameAsString() + "' does not exists.")); + return; + } + getTableSplits(tableName).whenComplete((splits, err1) -> { + if (err1 != null) { + future.completeExceptionally(err1); + } else { + checkAndSyncTableToPeerClusters(tableName, splits).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + setTableReplication(tableName, true).whenComplete((result3, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(result3); + } + }); + } + }); + } + }); + }); + return future; + } + + @Override + public CompletableFuture disableTableReplication(TableName tableName) { + if (tableName == null) { + return failedFuture(new IllegalArgumentException("Table name is null")); + } + CompletableFuture future = new CompletableFuture<>(); + tableExists(tableName).whenComplete( + (exist, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException("Table '" + + tableName.getNameAsString() + "' does not exists.")); + return; + } + setTableReplication(tableName, false).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + }); + return future; + } + + private CompletableFuture getTableSplits(TableName tableName) { + CompletableFuture future = new CompletableFuture<>(); + getTableRegions(tableName).whenComplete((regions, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (regions.size() == 1) { + future.complete(null); + } else { + byte[][] splits = new byte[regions.size() - 1][]; + for (int i = 1; i < regions.size(); i++) { + splits[i - 1] = regions.get(i).getStartKey(); + } + future.complete(splits); + } + }); + return future; + } + + /** + * Connect to peer and check the table descriptor on peer: + *
      + *
    1. Create the same table on peer when not exist.
    2. + *
    3. Throw an exception if the table already has replication enabled on any of the column + * families.
    4. + *
    5. Throw an exception if the table exists on peer cluster but descriptors are not same.
    6. + *
    + * @param tableName name of the table to sync to the peer + * @param splits table split keys + * @throws IOException + */ + private CompletableFuture checkAndSyncTableToPeerClusters(TableName tableName, + byte[][] splits) { + CompletableFuture future = new CompletableFuture<>(); + listReplicationPeers().whenComplete( + (peers, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (peers == null || peers.size() <= 0) { + future.completeExceptionally(new IllegalArgumentException( + "Found no peer cluster for replication.")); + return; + } + List> futures = new ArrayList<>(); + peers.stream().filter(peer -> ReplicationSerDeHelper.needToReplicate(tableName, peer)) + .forEach(peer -> { + futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) + .whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + }); + return future; + } + + private CompletableFuture trySyncTableToPeerCluster(TableName tableName, byte[][] splits, + ReplicationPeerDescription peer) { + Configuration peerConf = null; + try { + peerConf = + ReplicationSerDeHelper.getPeerClusterConfiguration(connection.getConfiguration(), peer); + } catch (IOException e) { + return failedFuture(e); + } + CompletableFuture future = new CompletableFuture<>(); + ConnectionFactory.createAsyncConnection(peerConf).whenComplete( + (conn, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + getTableDescriptor(tableName).whenComplete( + (tableDesc, err1) -> { + if (err1 != null) { + future.completeExceptionally(err1); + return; + } + AsyncAdmin peerAdmin = conn.getAdmin(); + peerAdmin.tableExists(tableName).whenComplete( + (exist, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (!exist) { + peerAdmin.createTable(tableDesc, Optional.ofNullable(splits)).whenComplete( + (result, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(result); + } + }); + } else { + compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin).whenComplete( + (result, err4) -> { + if (err4 != null) { + future.completeExceptionally(err4); + } else { + future.complete(result); + } + }); + } + }); + }); + }); + return future; + } + + private CompletableFuture compareTableWithPeerCluster(TableName tableName, + TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { + CompletableFuture future = new CompletableFuture<>(); + peerAdmin.getTableDescriptor(tableName).whenComplete( + (peerTableDesc, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (peerTableDesc == null) { + future.completeExceptionally(new IllegalArgumentException( + "Failed to get table descriptor for table " + tableName.getNameAsString() + + " from peer cluster " + peer.getPeerId())); + return; + } + if (!ReplicationSerDeHelper.compareIgnoreReplication(peerTableDesc, tableDesc)) { + future.completeExceptionally(new IllegalArgumentException("Table " + + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + + ", but the table descriptors are not same when compared with source cluster." + + " Thus can not enable the table's replication switch.")); + return; + } + future.complete(null); + }); + return future; + } + + /** + * Set the table's replication switch if the table's replication switch is already not set. + * @param tableName name of the table + * @param enableRep is replication switch enable or disable + */ + private CompletableFuture setTableReplication(TableName tableName, boolean enableRep) { + CompletableFuture future = new CompletableFuture<>(); + getTableDescriptor(tableName).whenComplete( + (tableDesc, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ReplicationSerDeHelper.ReplicationState currentReplicationState = + ReplicationSerDeHelper.getTableReplicationState(tableDesc); + if (enableRep && currentReplicationState != ReplicationSerDeHelper.ReplicationState.ENABLED + || !enableRep + && currentReplicationState != ReplicationSerDeHelper.ReplicationState.DISABLED) { + TableDescriptor newTableDesc = + ReplicationSerDeHelper.buildTableDescriptorWithReplicationScope(tableDesc, + enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL + : HConstants.REPLICATION_SCOPE_LOCAL); + modifyTable(newTableDesc).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + } else { + future.complete(null); + } + }); + return future; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java index 986a09f..6c23e7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -19,11 +19,19 @@ package org.apache.hadoop.hbase.client.replication; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -35,16 +43,18 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import java.io.IOException; import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.Set; /** @@ -434,4 +444,163 @@ public final class ReplicationSerDeHelper { } } } -} + + /** + * Compare the contents of the descriptor with another one passed as a parameter for replication + * purpose. The REPLICATION_SCOPE field is ignored during comparison. + * @param peerDesc descriptor on peer cluster + * @param localDesc descriptor on source cluster which needs to be replicated. + * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). + */ + public static boolean + compareIgnoreReplication(TableDescriptor peerDesc, TableDescriptor localDesc) { + if (peerDesc == localDesc) { + return true; + } + if (peerDesc == null || localDesc == null) { + return false; + } + + int result = peerDesc.getTableName().compareTo(localDesc.getTableName()); + if (result != 0) { + return false; + } + Collection peerFamilies = Arrays.asList(peerDesc.getColumnFamilies()); + Collection localFamilies = Arrays.asList(localDesc.getColumnFamilies()); + result = Integer.compare(peerFamilies.size(), localFamilies.size()); + if (result != 0) { + return false; + } + + for (Iterator it = peerFamilies.iterator(), it2 = + localFamilies.iterator(); it.hasNext();) { + result = CF_COMPARATOR_IGNORE_REPLICATION.compare(it.next(), it2.next()); + if (result != 0) { + return false; + } + } + // punt on comparison for ordering, just calculate difference + return Integer.compare(peerDesc.getValues().hashCode(), localDesc.getValues().hashCode()) == 0; + } + + private static final Bytes REPLICATION_SCOPE_BYTES = new Bytes( + Bytes.toBytes(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE)); + + private static final Comparator CF_COMPARATOR_IGNORE_REPLICATION = ( + ColumnFamilyDescriptor lcf, ColumnFamilyDescriptor rcf) -> { + int result = Bytes.compareTo(lcf.getName(), rcf.getName()); + if (result != 0) { + return result; + } + // ColumnFamilyDescriptor.getValues is a immutable map, so copy it and remove + // REPLICATION_SCOPE_BYTES + Map lValues = new HashMap<>(); + lValues.putAll(lcf.getValues()); + lValues.remove(REPLICATION_SCOPE_BYTES); + Map rValues = new HashMap<>(); + rValues.putAll(rcf.getValues()); + rValues.remove(REPLICATION_SCOPE_BYTES); + // punt on comparison for ordering, just calculate difference. + result = lValues.hashCode() - rValues.hashCode(); + if (result != 0) { + return result; + } + return lcf.getConfiguration().hashCode() - rcf.getConfiguration().hashCode(); + }; + + /** + * Decide whether the table need replicate to the peer cluster according to the peer config + * @param table name of the table + * @param peer config for the peer + * @return true if the table need replicate to the peer cluster + */ + public static boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + Set namespaces = peerConfig.getNamespaces(); + Map> tableCFsMap = peerConfig.getTableCFsMap(); + // If null means user has explicitly not configured any namespaces and table CFs + // so all the tables data are applicable for replication + if (namespaces == null && tableCFsMap == null) { + return true; + } + if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { + return true; + } + if (tableCFsMap != null && tableCFsMap.containsKey(table)) { + return true; + } + LOG.debug("Table " + table.getNameAsString() + + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" + + peerConfig.getClusterKey()); + return false; + } + + /** + * This enum indicates the current state of the replication for a given table. + */ + public enum ReplicationState { + ENABLED, // all column families enabled + MIXED, // some column families enabled, some disabled + DISABLED // all column families disabled + } + + /** + * @param desc table descriptor details for the table to check + * @return ReplicationState the current state of the table. + */ + public static ReplicationState getTableReplicationState(TableDescriptor desc) { + boolean hasEnabled = false; + boolean hasDisabled = false; + + for (ColumnFamilyDescriptor cf : desc.getColumnFamilies()) { + if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL + && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { + hasDisabled = true; + } else { + hasEnabled = true; + } + } + + if (hasEnabled && hasDisabled) return ReplicationState.MIXED; + if (hasEnabled) return ReplicationState.ENABLED; + return ReplicationState.DISABLED; + } + + /** + * Returns the configuration needed to talk to the remote slave cluster. + * @param conf the base configuration + * @param peer the description of replication peer + * @return the configuration for the peer cluster, null if it was unable to get the configuration + * @throws IOException + */ + public static Configuration getPeerClusterConfiguration(Configuration conf, + ReplicationPeerDescription peer) throws IOException { + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); + } catch (IOException e) { + throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); + } + + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return compound; + } + + return otherConf; + } + + public static TableDescriptor buildTableDescriptorWithReplicationScope(TableDescriptor tableDesc, + int replicationScope) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDesc); + tableDesc.getColumnFamilyNames().forEach(builder::removeColumnFamily); + for (ColumnFamilyDescriptor cf : tableDesc.getColumnFamilies()) { + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf) + .setScope(replicationScope).build()); + } + return builder.build(); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 3e577bc..e489078 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -41,10 +41,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java new file mode 100644 index 0000000..842ec1c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Class to test asynchronous replication admin operations when more than 1 cluster + */ +@RunWith(Parameterized.class) +@Category({LargeTests.class, ClientTests.class}) +public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase { + + private final static String ID_SECOND = "2"; + + private static HBaseTestingUtility TEST_UTIL2; + private static Configuration conf2; + private static AsyncAdmin admin2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + TEST_UTIL.startMiniCluster(); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + + conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + TEST_UTIL2 = new HBaseTestingUtility(conf2); + TEST_UTIL2.startMiniCluster(); + admin2 = ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin(); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(TEST_UTIL2.getClusterKey()); + ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); + } + + @After + public void tearDown() throws Exception { + Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*"); + cleanupTables(admin, pattern); + cleanupTables(admin2, pattern); + } + + private void cleanupTables(AsyncAdmin admin, Pattern pattern) { + admin.listTableNames(Optional.of(pattern), false).whenCompleteAsync((tables, err) -> { + if (tables != null) { + tables.forEach(table -> { + try { + admin.disableTable(table).join(); + } catch (Exception e) { + LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); + } + admin.deleteTable(table).join(); + }); + } + }, ForkJoinPool.commonPool()).join(); + } + + private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); + admin.createTable(builder.build()).join(); + } + + @Test + public void testEnableAndDisableTableReplication() throws Exception { + // default replication scope is local + createTableWithDefaultConf(tableName); + admin.enableTableReplication(tableName).join(); + TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get(); + for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { + assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); + } + + admin.disableTableReplication(tableName).join(); + tableDesc = admin.getTableDescriptor(tableName).get(); + for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { + assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); + } + } + + @Test + public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { + // Only create table in source cluster + createTableWithDefaultConf(tableName); + assertFalse(admin2.tableExists(tableName).get()); + admin.enableTableReplication(tableName).join(); + assertTrue(admin2.tableExists(tableName).get()); + } + + @Test + public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { + createTableWithDefaultConf(admin, tableName); + createTableWithDefaultConf(admin2, tableName); + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(admin.getTableDescriptor(tableName).get()); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")) + .build()); + admin2.disableTable(tableName).join(); + admin2.modifyTable(builder.build()).join(); + admin2.enableTable(tableName).join(); + + try { + admin.enableTableReplication(tableName).join(); + fail("Exception should be thrown if table descriptors in the clusters are not same."); + } catch (Exception ignored) { + // ok + } + + admin.disableTable(tableName).join(); + admin.modifyTable(builder.build()).join(); + admin.enableTable(tableName).join(); + admin.enableTableReplication(tableName).join(); + TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get(); + for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { + assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); + } + } + + @Test + public void testDisableReplicationForNonExistingTable() throws Exception { + try { + admin.disableTableReplication(tableName).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TableNotFoundException); + } + } + + @Test + public void testEnableReplicationForNonExistingTable() throws Exception { + try { + admin.enableTableReplication(tableName).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TableNotFoundException); + } + } + + @Test + public void testDisableReplicationWhenTableNameAsNull() throws Exception { + try { + admin.disableTableReplication(null).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void testEnableReplicationWhenTableNameAsNull() throws Exception { + try { + admin.enableTableReplication(null).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + /* + * Test enable table replication should create table only in user explicit specified table-cfs. + * HBASE-14717 + */ + @Test + public void testEnableReplicationForExplicitSetTableCfs() throws Exception { + TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2"); + // Only create table in source cluster + createTableWithDefaultConf(tableName); + createTableWithDefaultConf(tableName2); + assertFalse("Table should not exists in the peer cluster", admin2.tableExists(tableName).get()); + assertFalse("Table should not exists in the peer cluster", admin2.tableExists(tableName2).get()); + + Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, null); + ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get(); + rpc.setTableCFsMap(tableCfs); + try { + // Only add tableName to replication peer config + admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); + admin.enableTableReplication(tableName2).join(); + assertFalse("Table should not be created if user has set table cfs explicitly for the " + + "peer and this is not part of that collection", admin2.tableExists(tableName2).get()); + + // Add tableName2 to replication peer config, too + tableCfs.put(tableName2, null); + rpc.setTableCFsMap(tableCfs); + admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); + admin.enableTableReplication(tableName2).join(); + assertTrue( + "Table should be created if user has explicitly added table into table cfs collection", + admin2.tableExists(tableName2).get()); + } finally { + rpc.setTableCFsMap(null); + admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); + } + } +} -- 1.9.1