From f081543aa7a9022ee63f98a388c477b2c0a944c4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 17 Feb 2017 17:56:53 -0800 Subject: [PATCH] HBASE-17460 enable_table_replication can not perform cyclic replication of a table (NITIN VERMA) Change-Id: I362e6573affdcdab133c23ac11e96916113b1b78 --- .../org/apache/hadoop/hbase/HColumnDescriptor.java | 2 +- .../org/apache/hadoop/hbase/HTableDescriptor.java | 25 +++++ .../hbase/client/replication/ReplicationAdmin.java | 119 ++++++++++++++++++--- .../hbase/test/IntegrationTestReplication.java | 2 +- .../TestReplicationAdminWithClusters.java | 20 ++-- ...ReplicationAdminWithTwoDifferentZKClusters.java | 2 +- .../src/main/ruby/hbase/replication_admin.rb | 5 +- .../shell/commands/enable_table_replication.rb | 8 +- 8 files changed, 153 insertions(+), 30 deletions(-) diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 3b2e99c..94899e3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -1386,7 +1386,7 @@ public class HColumnDescriptor implements WritableComparable public int compareTo(HColumnDescriptor o) { int result = Bytes.compareTo(this.name, o.getName()); if (result == 0) { - // punt on comparison for ordering, just calculate difference + // punt on comparison for ordering, just calculate difference. result = this.values.hashCode() - o.values.hashCode(); if (result < 0) result = -1; diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 4f3ac9c..164a7f7 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -1064,6 +1064,31 @@ public class HTableDescriptor implements WritableComparable { } /** + * Detects whether replication has been already enabled on any of the column families of this + * table descriptor. + * @return true if any of the column families has replication enabled. + */ + public boolean isReplicationEnabled() { + // Go through each Column-Family descriptor and check if the + // Replication has been enabled already. + // Return 'true' if replication has been enabled on any CF, + // otherwise return 'false'. + // + boolean result = false; + Iterator it = this.families.values().iterator(); + + while (it.hasNext()) { + HColumnDescriptor tempHcd = it.next(); + if (tempHcd.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { + result = true; + break; + } + } + + return result; + } + + /** * @see java.lang.Object#hashCode() */ @Override diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d462f38..b828b6a 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -54,6 +55,9 @@ import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + /** *

* This class provides the administrative interface to HBase cluster @@ -503,7 +507,8 @@ public class ReplicationAdmin implements Closeable { * @param tableName name of the table * @throws IOException if a remote or network exception occurs */ - public void enableTableRep(final TableName tableName) throws IOException { + public void enableTableRep(final TableName tableName, boolean throwExOnPartialEnable) + throws IOException { if (tableName == null) { throw new IllegalArgumentException("Table name cannot be null"); } @@ -514,7 +519,7 @@ public class ReplicationAdmin implements Closeable { } } byte[][] splits = getTableSplitRowKeys(tableName); - checkAndSyncTableDescToPeers(tableName, splits); + checkAndSyncTableDescToPeers(tableName, splits, throwExOnPartialEnable); setTableRep(tableName, true); } @@ -560,14 +565,16 @@ public class ReplicationAdmin implements Closeable { * Connect to peer and check the table descriptor on peer: *

    *
  1. Create the same table on peer when not exist.
  2. - *
  3. Throw exception if the table exists on peer cluster but descriptors are not same.
  4. + *
  5. Throw an exception if the table already has replication enabled on any of the column + * families.
  6. + *
  7. Throw an exception if the table exists on peer cluster but descriptors are not same.
  8. *
* @param tableName name of the table to sync to the peer * @param splits table split keys * @throws IOException */ - private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) - throws IOException { + private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits, + boolean throwExOnPartialEnable) throws IOException { List repPeers = listReplicationPeers(); if (repPeers == null || repPeers.size() <= 0) { throw new IllegalArgumentException("Found no peer cluster for replication."); @@ -584,24 +591,36 @@ public class ReplicationAdmin implements Closeable { } Configuration peerConf = repPeer.getConfiguration(); - HTableDescriptor htd = null; + HTableDescriptor localHtd = null; try (Connection conn = ConnectionFactory.createConnection(peerConf); Admin admin = this.connection.getAdmin(); Admin repHBaseAdmin = conn.getAdmin()) { - htd = admin.getTableDescriptor(tableName); + localHtd = admin.getTableDescriptor(tableName); HTableDescriptor peerHtd = null; if (!repHBaseAdmin.tableExists(tableName)) { - repHBaseAdmin.createTable(htd, splits); + repHBaseAdmin.createTable(localHtd, splits); } else { peerHtd = repHBaseAdmin.getTableDescriptor(tableName); if (peerHtd == null) { throw new IllegalArgumentException("Failed to get table descriptor for table " + tableName.getNameAsString() + " from peer cluster " + repPeer.getId()); - } else if (!peerHtd.equals(htd)) { - throw new IllegalArgumentException("Table " + tableName.getNameAsString() - + " exists in peer cluster " + repPeer.getId() - + ", but the table descriptors are not same when compared with source cluster." - + " Thus can not enable the table's replication switch."); + } else { + // To support cyclic replication (HBASE-17460), we need to match the + // REPLICATION_SCOPE of table on both the clusters. We should do this + // only when the replication is not already enabled on local HTD (local + // table on this cluster). + // + if (throwExOnPartialEnable && localHtd.isReplicationEnabled()) { + throw new IllegalArgumentException("Table " + tableName.getNameAsString() + + " has replication already enabled for at least one Column Family."); + } else { + if (!compareForReplication(peerHtd, localHtd)) { + throw new IllegalArgumentException("Table " + tableName.getNameAsString() + + " exists in peer cluster " + repPeer.getId() + + ", but the table descriptors are not same when compared with source cluster." + + " Thus can not enable the table's replication switch."); + } + } } } } @@ -709,4 +728,78 @@ public class ReplicationAdmin implements Closeable { } } } + + /** + * Compare the contents of the descriptor with another one passed as a parameter for replication + * purpose. The REPLICATION_SCOPE field is ignored during comparison. + * @param peerHtd descriptor on peer cluster + * @param localHtd descriptor on source cluster which needs to be replicated. + * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { + if (peerHtd == localHtd) { + return true; + } + if (peerHtd == null) { + return false; + } + boolean result = false; + + // Create a copy of peer HTD as we need to change its replication + // scope to match with the local HTD. + HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); + + result = copyReplicationScope(peerHtdCopy, localHtd); + + // If copy was successful, compare the two tables now. + if (result) { + result = (peerHtdCopy.compareTo(localHtd) == 0); + } + + return result; + } + + /** + * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method + * ensures that the name of table and column-families should match. + * @param peerHtd descriptor on peer cluster + * @param localHtd - The HTableDescriptor of table from source cluster. + * @return true If the name of table and column families match and REPLICATION_SCOPE copied + * successfully. false If there is any mismatch in the names. + */ + public boolean copyReplicationScope(final HTableDescriptor peerHtd, final HTableDescriptor localHtd) + { + // Copy the REPLICATION_SCOPE only when table names and the names of + // Column-Families are same. + int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); + + if (result == 0) { + Iterator remoteHCDIter = peerHtd.getFamilies().iterator(); + Iterator localHCDIter = localHtd.getFamilies().iterator(); + + while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { + HColumnDescriptor remoteHCD = remoteHCDIter.next(); + HColumnDescriptor localHCD = localHCDIter.next(); + + String remoteHCDName = remoteHCD.getNameAsString(); + String localHCDName = localHCD.getNameAsString(); + + if (remoteHCDName.equals(localHCDName)) + { + remoteHCD.setScope(localHCD.getScope()); + } + else { + result = -1; + break; + } + } + + if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { + return false; + } + } + + return result == 0; + } } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java index 141b24d..4393fd2 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java @@ -238,7 +238,7 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { replicationAdmin.addPeer("TestPeer", peerConfig, toReplicate); - replicationAdmin.enableTableRep(tableName); + replicationAdmin.enableTableRep(tableName, false); replicationAdmin.close(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index b75c1cf..853adc9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -71,10 +71,12 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000) public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.disableTableRep(tableName); admin2.disableTable(tableName); admin2.deleteTable(tableName); assertFalse(admin2.tableExists(tableName)); - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); assertTrue(admin2.tableExists(tableName)); } @@ -92,7 +94,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.modifyTable(tableName, table); admin2.enableTable(tableName); - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); @@ -109,7 +111,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.enableTable(tableName); try { - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); fail("Exception should be thrown if table descriptors in the clusters are not same."); } catch (RuntimeException ignored) { @@ -117,7 +119,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin1.disableTable(tableName); admin1.modifyTable(tableName, table); admin1.enableTable(tableName); - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); @@ -135,7 +137,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { for (HColumnDescriptor fam : table.getColumnFamilies()) { assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL); } - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); @@ -149,7 +151,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000, expected = TableNotFoundException.class) public void testEnableReplicationForNonExistingTable() throws Exception { - adminExt.enableTableRep(TableName.valueOf("nonExistingTable")); + adminExt.enableTableRep(TableName.valueOf("nonExistingTable"), false); } @Test(timeout = 300000, expected = IllegalArgumentException.class) @@ -159,7 +161,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000, expected = IllegalArgumentException.class) public void testEnableReplicationWhenTableNameAsNull() throws Exception { - adminExt.enableTableRep(null); + adminExt.enableTableRep(null, false); } /* @@ -181,14 +183,14 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { tableCfs.put(tn, null); try { adminExt.setPeerTableCFs(peerId, tableCfs); - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); assertFalse("Table should not be created if user has set table cfs explicitly for the " + "peer and this is not part of that collection", admin2.isTableAvailable(tableName)); tableCfs.put(tableName, null); adminExt.setPeerTableCFs(peerId, tableCfs); - adminExt.enableTableRep(tableName); + adminExt.enableTableRep(tableName, false); assertTrue( "Table should be created if user has explicitly added table into table cfs collection", admin2.isTableAvailable(tableName)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java index a485a6d..ed7b37d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithTwoDifferentZKClusters.java @@ -85,7 +85,7 @@ public class TestReplicationAdminWithTwoDifferentZKClusters { */ @Test public void testEnableTableReplication() throws Exception { - admin.enableTableRep(tableName); + admin.enableTableRep(tableName, false); assertTrue(utility2.getHBaseAdmin().tableExists(tableName)); } diff --git hbase-shell/src/main/ruby/hbase/replication_admin.rb hbase-shell/src/main/ruby/hbase/replication_admin.rb index f0da3ae..55db225 100644 --- hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -193,9 +193,10 @@ module Hbase #---------------------------------------------------------------------------------------------- # Enables a table's replication switch - def enable_tablerep(table_name) + def enable_tablerep(table_name, throw_ex_on_partial_enable) tableName = TableName.valueOf(table_name) - @replication_admin.enableTableRep(tableName) + throwExOnPartialEnable = java.lang.Boolean.valueOf(throw_ex_on_partial_enable) + @replication_admin.enableTableRep(tableName, throwExOnPartialEnable) end #---------------------------------------------------------------------------------------------- # Disables a table's replication switch diff --git hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb index 15e3133..e9784ed 100644 --- hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb +++ hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb @@ -21,17 +21,19 @@ module Shell class EnableTableReplication< Command def help return <<-EOF -Enable a table's replication switch. +Enable a table's replication switch. For detecting partially enabled replication and reporting error +use flag true. Examples: hbase> enable_table_replication 'table_name' + hbase> enable_table_replication 'table_name', 'true' EOF end - def command(table_name) + def command(table_name, throw_ex_on_partial_enable = 'false') format_simple_command do - replication_admin.enable_tablerep(table_name) + replication_admin.enable_tablerep(table_name, throw_ex_on_partial_enable) end puts "The replication swith of table '#{table_name}' successfully enabled" end -- 2.10.1 (Apple Git-78)