From 94407d6f7ebb8aa1208ecdd31e8a5a8d19f02544 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Tue, 17 Feb 2015 14:16:28 +0530 Subject: [PATCH] HBASE-13057 Provide client utility to easily enable and disable table replication --- .../hbase/client/replication/ReplicationAdmin.java | 226 +++++++++++++++++++++ .../TestReplicationAdminWithClusters.java | 164 +++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 10 + hbase-shell/src/main/ruby/shell.rb | 2 + .../shell/commands/disable_table_replication.rb | 42 ++++ .../shell/commands/enable_table_replication.rb | 42 ++++ 6 files changed, 486 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 2d5c5e9..c30acfb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -38,17 +38,27 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HRegionLocator; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -501,4 +511,220 @@ public class ReplicationAdmin implements Closeable { return replicationColFams; } + + /** + * Enable a table's replication switch. + * @param tableName name of the table + * @throws IOException if a remote or network exception occurs + */ + public void enableTableRep(final String tableName) throws IOException { + if (tableName == null) { + throw new IllegalArgumentException("Table name cannot be null"); + } + TableName name = TableName.valueOf(tableName); + try (Admin admin = this.connection.getAdmin()) { + if (!admin.tableExists(name)) { + throw new TableNotFoundException("Table '" + tableName + "' does not exists."); + } + } + byte[][] splits = getTableSplitRowKeys(name); + checkAndSyncTableDescToPeers(name, splits); + setTableRep(name, true); + } + + /** + * Enable a table's replication switch. + * @param tableName name of the table + * @throws IOException if a remote or network exception occurs + */ + public void enableTableRep(final byte[] tableName) throws IOException { + enableTableRep(Bytes.toString(tableName)); + } + + /** + * Disable a table's replication switch. + * @param tableName name of the table + * @throws IOException if a remote or network exception occurs + */ + public void disableTableRep(final String tableName) throws IOException { + if (tableName == null) { + throw new IllegalArgumentException("Table name is null"); + } + TableName name = TableName.valueOf(tableName); + try (Admin admin = this.connection.getAdmin()) { + if (!admin.tableExists(name)) { + throw new TableNotFoundException("Table '" + tableName + "' does not exists."); + } + } + setTableRep(name, false); + } + + /** + * Disable a table's replication switch. + * @param tableName name of the table + * @throws IOException if a remote or network exception occurs + */ + public void disableTableRep(final byte[] tableName) throws IOException { + disableTableRep(Bytes.toString(tableName)); + } + + /** + * Get the split row keys of table + * @param tableName table name + * @return The list of split row keys + * @throws IOException + */ + private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException { + try (HRegionLocator locator = new HRegionLocator(tableName, (ClusterConnection) connection);) { + byte[][] startKeys = locator.getStartKeys(); + if (startKeys.length == 1) { + return null; + } + byte[][] splits = new byte[startKeys.length - 1][]; + for (int i = 1; i < startKeys.length; i++) { + splits[i - 1] = startKeys[i]; + } + return splits; + } + } + + /** + * Connect to peer and check the table descriptor on peer: + *
    + *
  1. Create the same table on peer when not exist.
  2. + *
  3. Throw exception if the table exists on peer cluster but descriptors are not same.
  4. + *
+ * @param tableName name of the table to sync to the peer + * @param splits table split keys + * @throws IOException + */ + private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) + throws IOException { + List repPeers = listValidReplicationPeers(); + if (repPeers == null || repPeers.size() <= 0) { + throw new IllegalArgumentException("Found no peer cluster for replication."); + } + for (ReplicationPeer repPeer : repPeers) { + Configuration peerConf = repPeer.getConfiguration(); + HTableDescriptor htd = null; + try (Connection conn = ConnectionFactory.createConnection(peerConf); + Admin admin = this.connection.getAdmin(); + Admin repHBaseAdmin = conn.getAdmin()) { + htd = admin.getTableDescriptor(tableName); + HTableDescriptor peerHtd = null; + if (!repHBaseAdmin.tableExists(tableName)) { + repHBaseAdmin.createTable(htd, splits); + } else { + peerHtd = repHBaseAdmin.getTableDescriptor(tableName); + if (peerHtd == null) { + throw new IllegalArgumentException("Failed to get table descriptor for " + + tableName.getNameAsString()); + } else if (!peerHtd.equals(htd)) { + throw new IllegalArgumentException("Table " + tableName.getNameAsString() + + " exists in peer cluster, but the table descriptors are not same." + + " Thus can not enable the table's replication switch."); + } + } + } + } + } + + private List listValidReplicationPeers() { + Map peers = listPeerConfigs(); + if (peers == null || peers.size() <= 0) { + return null; + } + List validPeers = new ArrayList(peers.size()); + for (Entry peerEntry : peers.entrySet()) { + String peerId = peerEntry.getKey(); + String clusterKey = peerEntry.getValue().getClusterKey(); + Configuration peerConf = new Configuration(this.connection.getConfiguration()); + Stat s = null; + try { + ZKUtil.applyClusterKeyToConf(peerConf, clusterKey); + Pair pair = this.replicationPeers.getPeerConf(peerId); + ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); + s = + zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT), + null); + if (null == s) { + LOG.info(peerId + clusterKey + " is invalid now."); + continue; + } + validPeers.add(peer); + } catch (ReplicationException e) { + LOG.warn("Failed to get valid replication peers. " + + "Error connecting to peer cluster with peerId=" + peerId); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } catch (KeeperException e) { + LOG.warn("Failed to get valid replication peers. KeeperException code=" + + e.code().intValue()); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } catch (InterruptedException e) { + LOG.warn("Failed to get valid replication peers due to InterruptedException."); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } catch (IOException e) { + LOG.warn("Failed to get valid replication peers due to IOException."); + LOG.debug("Failure details to get valid replication peers.", e); + continue; + } + } + return validPeers; + } + + /** + * Set the table's replication switch if the table's replication switch is already not set. + * @param tableName name of the table + * @param isRepEnable is replication switch enable or disable + * @throws IOException if a remote or network exception occurs + */ + private void setTableRep(final TableName tableName, boolean isRepEnable) throws IOException { + Admin admin = null; + try { + admin = this.connection.getAdmin(); + HTableDescriptor htd = admin.getTableDescriptor(tableName); + if (isTableRepEnabled(htd) ^ isRepEnable) { + boolean isOnlineSchemaUpdateEnabled = + this.connection.getConfiguration() + .getBoolean("hbase.online.schema.update.enable", true); + if (!isOnlineSchemaUpdateEnabled) { + admin.disableTable(tableName); + } + for (HColumnDescriptor hcd : htd.getFamilies()) { + hcd.setScope(isRepEnable ? HConstants.REPLICATION_SCOPE_GLOBAL + : HConstants.REPLICATION_SCOPE_LOCAL); + } + admin.modifyTable(tableName, htd); + if (!isOnlineSchemaUpdateEnabled) { + admin.enableTable(tableName); + } + } + } finally { + if (admin != null) { + try { + admin.close(); + } catch (IOException e) { + LOG.warn("Failed to close admin connection."); + LOG.debug("Details on failure to close admin connection.", e); + } + } + } + } + + /** + * @param tableName name of the table to check + * @return true if table's replication switch is enabled + * @throws IOException if a remote or network exception occurs + */ + private boolean isTableRepEnabled(HTableDescriptor htd) throws IOException { + for (HColumnDescriptor hcd : htd.getFamilies()) { + if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { + return false; + } + } + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java new file mode 100644 index 0000000..5c78238 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.client.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Unit testing of ReplicationAdmin with clusters + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestReplicationAdminWithClusters extends TestReplicationBase { + + static Connection connection1; + static Connection connection2; + static Admin admin1; + static Admin admin2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestReplicationBase.setUpBeforeClass(); + connection1 = ConnectionFactory.createConnection(conf1); + connection2 = ConnectionFactory.createConnection(conf2); + admin1 = connection1.getAdmin(); + admin2 = connection2.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + admin1.close(); + admin2.close(); + connection1.close(); + connection2.close(); + TestReplicationBase.tearDownAfterClass(); + } + + @Test(timeout = 300000) + public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { + admin2.disableTable(tableName); + admin2.deleteTable(tableName); + assertFalse(admin2.tableExists(tableName)); + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep(tableName.getNameAsString()); + assertTrue(admin2.tableExists(tableName)); + } + + @Test(timeout = 300000) + public void testEnableReplicationWhenReplicationNotEnabled() throws Exception { + HTableDescriptor table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL); + } + admin1.disableTable(tableName); + admin1.modifyTable(tableName, table); + admin1.enableTable(tableName); + + admin2.disableTable(tableName); + admin2.modifyTable(tableName, table); + admin2.enableTable(tableName); + + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep(tableName.getNameAsString()); + table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); + } + } + + @Test(timeout = 300000) + public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { + HTableDescriptor table = admin2.getTableDescriptor(tableName); + HColumnDescriptor f = new HColumnDescriptor("newFamily"); + table.addFamily(f); + admin2.disableTable(tableName); + admin2.modifyTable(tableName, table); + admin2.enableTable(tableName); + + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + try { + adminExt.enableTableRep(tableName.getNameAsString()); + fail("Exception should be thrown if table descriptors in the clusters are not same."); + } catch (RuntimeException ignored) { + + } + admin1.disableTable(tableName); + admin1.modifyTable(tableName, table); + admin1.enableTable(tableName); + adminExt.enableTableRep(tableName.getNameAsString()); + table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); + } + } + + @Test(timeout = 300000) + public void testDisableAndEnableReplication() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.disableTableRep(tableName.getNameAsString()); + HTableDescriptor table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL); + } + table = admin2.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL); + } + adminExt.enableTableRep(tableName.getNameAsString()); + table = admin1.getTableDescriptor(tableName); + for (HColumnDescriptor fam : table.getColumnFamilies()) { + assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_GLOBAL); + } + } + + @Test(timeout = 300000, expected = TableNotFoundException.class) + public void testDisableReplicationForNonExistingTable() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.disableTableRep("nonExistingTable"); + } + + @Test(timeout = 300000, expected = TableNotFoundException.class) + public void testEnableReplicationForNonExistingTable() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + adminExt.enableTableRep("nonExistingTable"); + } + + @Test(timeout = 300000, expected = IllegalArgumentException.class) + public void testDisableReplicationWithTableNameAsNull() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + byte[] a = null; + adminExt.disableTableRep(a); + } + + @Test(timeout = 300000, expected = IllegalArgumentException.class) + public void testEnableReplicationWithTableNameAsNull() throws Exception { + ReplicationAdmin adminExt = new ReplicationAdmin(conf1); + byte[] a = null; + adminExt.enableTableRep(a); + } +} diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 2d0845f..319df0b 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -157,5 +157,15 @@ module Hbase def remove_peer_tableCFs(id, tableCFs) @replication_admin.removePeerTableCFs(id, tableCFs) end + #---------------------------------------------------------------------------------------------- + # Enables a table's replication switch + def enable_tablerep(table_name) + @replication_admin.enableTableRep(table_name) + end + #---------------------------------------------------------------------------------------------- + # Disables a table's replication switch + def disable_tablerep(table_name) + @replication_admin.disableTableRep(table_name) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 5db2776..893079d 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -349,6 +349,8 @@ Shell.load_command_group( list_replicated_tables append_peer_tableCFs remove_peer_tableCFs + enable_table_replication + disable_table_replication ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb b/hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb new file mode 100644 index 0000000..4c46fea --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/disable_table_replication.rb @@ -0,0 +1,42 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class DisableTableReplication< Command + def help + return <<-EOF +Disable a table's replication switch. + +Examples: + + hbase> disable_table_replication 'table_name' +EOF + end + + def command(table_name) + format_simple_command do + replication_admin.disable_tablerep(table_name) + end + puts "The replication swith of table '#{table_name}' successfully disabled" + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb b/hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb new file mode 100644 index 0000000..5d57f03 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/enable_table_replication.rb @@ -0,0 +1,42 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class EnableTableReplication< Command + def help + return <<-EOF +Enable a table's replication switch. + +Examples: + + hbase> enable_table_replication 'table_name' +EOF + end + + def command(table_name) + format_simple_command do + replication_admin.enable_tablerep(table_name) + end + puts "The replication swith of table '#{table_name}' successfully enabled" + end + end + end +end -- 1.9.2.msysgit.0