diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 028ab76..14b4e1e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -1097,7 +1097,7 @@ public class HColumnDescriptor implements Comparable { public int compareTo(HColumnDescriptor o) { int result = Bytes.compareTo(this.name, o.getName()); if (result == 0) { - // punt on comparison for ordering, just calculate difference + // punt on comparison for ordering, just calculate difference. result = this.values.hashCode() - o.values.hashCode(); if (result < 0) result = -1; @@ -1187,6 +1187,24 @@ public class HColumnDescriptor implements Comparable { } /** + * Returns the value of REPLICATION_SCOPE. + * @return value corresponding to REPLICATION_SCOPE. + */ + public String getReplicationScope() { + String replScope = this.values.get(new Bytes(REPLICATION_SCOPE_BYTES)).toString(); + return replScope; + } + + /** + * Copy the value of REPLICATION_SCOPE from another column descriptor. + * @param HColumnDescriptor + */ + public void copyReplicationScope(HColumnDescriptor hcd) { + this.values.put(new Bytes(REPLICATION_SCOPE_BYTES), + hcd.values.get(new Bytes(REPLICATION_SCOPE_BYTES))); + } + + /** * Set the encryption algorithm for use with this family * @param algorithm */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index be8e858..e5114d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; /** @@ -1042,6 +1042,101 @@ public class HTableDescriptor implements Comparable { } /** + * Detects whether replication has been already enabled on any of the column families of this + * table descriptor. + * @return true if any of the column families has replication enabled. + */ + public boolean hasReplicationEnabled() { + // Go through each Column-Family descriptor and check if the + // Replication has been enabled already. + // Return 'true' if replication has been enabled on any CF, + // otherwise return 'false'. + // + boolean result = false; + Iterator it = this.families.values().iterator(); + + while (it.hasNext()) { + HColumnDescriptor tempHcd = it.next(); + if (!tempHcd.getReplicationScope().equals("0")) { + result = true; + break; + } + } + + return result; + } + + /** + * Compare the contents of the descriptor with another one passed as a parameter for replication + * purpose. The REPLICATION_SCOPE field is ignored during comparison. + * @return true if the contents of the the two descriptors match (ignoring just + * REPLICATION_SCOPE). + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean compareForReplication(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof HTableDescriptor)) { + return false; + } + + boolean result = false; + + // Copy the replication scope of local Htd to remote Htd. + HTableDescriptor localHtd = (HTableDescriptor) obj; + + result = (this.copyReplicationScope(localHtd) == 0); + + // If copy was successful, compare the two tables now. + if (result == true) { + result = (compareTo(localHtd) == 0); + } + + return result; + } + + /** + * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method + * ensures that the name of table and column-families should match. + * @return 0 If the name of table and column families match and REPLICATION_SCOPE copied successfully. + * 1 If there is any mismatch in the names. + */ + public int copyReplicationScope(final HTableDescriptor localHtd) + { + // Copy the REPLICATION_SCOPE only when table names and the names of + // Column-Families are same. + int result = this.name.compareTo(localHtd.name); + + if (result == 0) { + Iterator remoteHCDIter = families.values().iterator(); + Iterator localHCDIter = localHtd.families.values().iterator(); + + while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { + HColumnDescriptor remoteHCD = remoteHCDIter.next(); + HColumnDescriptor localHCD = localHCDIter.next(); + + String remoteHCDName = remoteHCD.getNameAsString(); + String localHCDName = localHCD.getNameAsString(); + + if (remoteHCDName.equals(localHCDName)) + { + remoteHCD.copyReplicationScope(localHCD); + } + else { + result = -1; + break; + } + } + } + + return result; + } + + /** * @see java.lang.Object#hashCode() */ @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index f9ca443..4b5392f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -536,53 +536,72 @@ public class ReplicationAdmin implements Closeable { * Connect to peer and check the table descriptor on peer: *
    *
  1. Create the same table on peer when not exist.
  2. - *
  3. Throw exception if the table exists on peer cluster but descriptors are not same.
  4. + *
  5. Throw an exception if the table already has replication enabled on any of the column + * families.
  6. + *
  7. Throw an exception if the table exists on peer cluster but descriptors are not same.
  8. *
* @param tableName name of the table to sync to the peer * @param splits table split keys * @throws IOException */ - private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) - throws IOException { - List repPeers = listReplicationPeers(); - if (repPeers == null || repPeers.size() <= 0) { - throw new IllegalArgumentException("Found no peer cluster for replication."); - } + private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) + throws IOException { + List repPeers = listReplicationPeers(); + if (repPeers == null || repPeers.size() <= 0) { + throw new IllegalArgumentException("Found no peer cluster for replication."); + } - final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString()); + final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString()); - for (ReplicationPeer repPeer : repPeers) { - Map> tableCFMap = repPeer.getTableCFs(); - // TODO Currently peer TableCFs will not include namespace so we need to check only for table - // name without namespace in it. Need to correct this logic once we fix HBASE-11386. - if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) { - continue; - } + for (ReplicationPeer repPeer : repPeers) { + Map> tableCFMap = repPeer.getTableCFs(); + // TODO Currently peer TableCFs will not include namespace so we need to check only for table + // name without namespace in it. Need to correct this logic once we fix HBASE-11386. + if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) { + continue; + } - Configuration peerConf = repPeer.getConfiguration(); - HTableDescriptor htd = null; - try (Connection conn = ConnectionFactory.createConnection(peerConf); - Admin admin = this.connection.getAdmin(); - Admin repHBaseAdmin = conn.getAdmin()) { - htd = admin.getTableDescriptor(tableName); - HTableDescriptor peerHtd = null; - if (!repHBaseAdmin.tableExists(tableName)) { - repHBaseAdmin.createTable(htd, splits); - } else { - peerHtd = repHBaseAdmin.getTableDescriptor(tableName); - if (peerHtd == null) { - throw new IllegalArgumentException("Failed to get table descriptor for table " - + tableName.getNameAsString() + " from peer cluster " + repPeer.getId()); - } else if (!peerHtd.equals(htd)) { - throw new IllegalArgumentException("Table " + tableName.getNameAsString() - + " exists in peer cluster " + repPeer.getId() - + ", but the table descriptors are not same when compared with source cluster." - + " Thus can not enable the table's replication switch."); + Configuration peerConf = repPeer.getConfiguration(); + + HTableDescriptor localHtd = null; + try (Connection conn = ConnectionFactory.createConnection(peerConf); + Admin admin = this.connection.getAdmin(); + Admin repHBaseAdmin = conn.getAdmin()) { + localHtd = admin.getTableDescriptor(tableName); + HTableDescriptor peerHtdOrig = null; + if (!repHBaseAdmin.tableExists(tableName)) { + repHBaseAdmin.createTable(localHtd, splits); + } else { + peerHtdOrig = repHBaseAdmin.getTableDescriptor(tableName); + if (peerHtdOrig == null) { + throw new IllegalArgumentException("Failed to get table descriptor for table " + + tableName.getNameAsString() + " from peer cluster " + repPeer.getId()); + } else { + // To support cyclic replication (HBASE-17460), we need to match the + // REPLICATION_SCOPE of table on both the clusters. We should do this + // only when the replication is not already enabled on local HTD (local + // table on this cluster). + // + HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtdOrig); + + if (localHtd.hasReplicationEnabled()) { + throw new IllegalArgumentException("Table " + tableName.getNameAsString() + + " has replication already enabled for atleast one Column Family."); + } + else + { + if (!peerHtdCopy.compareForReplication(localHtd)) { + throw new IllegalArgumentException("Table " + tableName.getNameAsString() + + " exists in peer cluster " + repPeer.getId() + + ", but the table descriptors are not same when compared with source cluster." + + " Thus can not enable the table's replication switch."); + } + } + } } } } } - } @VisibleForTesting public void peerAdded(String id) throws ReplicationException {