From 3afa578302fe07595e84b01bb056556f651fe6f5 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Fri, 25 Jan 2019 00:24:39 +0900 Subject: [PATCH] HBASE-21201 Support to run VerifyReplication MR tool without peerid --- .../replication/VerifyReplication.java | 84 +++++++++++++------ .../replication/TestVerifyReplication.java | 25 ++++++ 2 files changed, 85 insertions(+), 24 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index d5f8215d52..5eebce651c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -96,6 +97,7 @@ public class VerifyReplication extends Configured implements Tool { String families = null; String delimiter = ""; String peerId = null; + String peerQuorumAddress = null; String rowPrefixes = null; int sleepMsBeforeReCompare = 0; boolean verbose = false; @@ -122,10 +124,9 @@ public class VerifyReplication extends Configured implements Tool { public static class Verifier extends TableMapper { - - public static enum Counters { - GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} + GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, + CONTENT_DIFFERENT_ROWS} private Connection sourceConnection; private Table sourceTable; @@ -267,8 +268,8 @@ public class VerifyReplication extends Configured implements Tool { if (!sourceResult.isEmpty()) { context.getCounter(Counters.GOODROWS).increment(1); if (verbose) { - LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) - + delimiter); + LOG.info("Good row key (with recompare): " + delimiter + + Bytes.toStringBinary(row.getRow()) + delimiter); } } return; @@ -385,7 +386,9 @@ public class VerifyReplication extends Configured implements Tool { if (!doCommandLine(args)) { return null; } - conf.set(NAME+".peerId", peerId); + if (peerId != null) { + conf.set(NAME + ".peerId", peerId); + } conf.set(NAME+".tableName", tableName); conf.setLong(NAME+".startTime", startTime); conf.setLong(NAME+".endTime", endTime); @@ -401,14 +404,23 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".rowPrefixes", rowPrefixes); } - Pair peerConfigPair = getPeerQuorumConfig(conf, peerId); - ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); - String peerQuorumAddress = peerConfig.getClusterKey(); - LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + + String peerQuorumAddress; + Pair peerConfigPair = null; + if (peerId != null) { + peerConfigPair = getPeerQuorumConfig(conf, peerId); + ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); + peerQuorumAddress = peerConfig.getClusterKey(); + LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + peerConfig.getConfiguration()); - conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, peerConfig.getConfiguration().entrySet()); + } else { + assert this.peerQuorumAddress != null; + peerQuorumAddress = this.peerQuorumAddress; + LOG.info("Peer Quorum Address: " + peerQuorumAddress); + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + } conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); @@ -463,9 +475,13 @@ public class VerifyReplication extends Configured implements Tool { } else { TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); } - Configuration peerClusterConf = peerConfigPair.getSecond(); - // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + + if (peerId != null) { + assert peerConfigPair != null; + Configuration peerClusterConf = peerConfigPair.getSecond(); + // Obtain the auth token from peer cluster + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + } job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); @@ -610,7 +626,11 @@ public class VerifyReplication extends Configured implements Tool { } if (i == args.length-2) { - peerId = cmd; + if (isPeerQuorumAddress(cmd)) { + peerQuorumAddress = cmd; + } else { + peerId = cmd; + } } if (i == args.length-1) { @@ -630,7 +650,7 @@ public class VerifyReplication extends Configured implements Tool { if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null || peerHBaseRootAddress == null) { printUsage( - "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " + "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " + "peer FSAddress should be provided to use snapshots in peer cluster"); return false; } @@ -639,7 +659,8 @@ public class VerifyReplication extends Configured implements Tool { // This is to avoid making recompare calls to source/peer tables when snapshots are used if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { printUsage( - "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); + "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots " + + "are immutable"); return false; } @@ -651,6 +672,16 @@ public class VerifyReplication extends Configured implements Tool { return true; } + private boolean isPeerQuorumAddress(String cmd) { + try { + ZKConfig.validateClusterKey(cmd); + } catch (IOException e) { + // not a quorum address + return false; + } + return true; + } + /* * @param errorMsg Error message. Can be null. */ @@ -660,16 +691,17 @@ public class VerifyReplication extends Configured implements Tool { } System.err.println("Usage: verifyrep [--starttime=X]" + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " - + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] "); + "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] " + + "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] " + + "[--peerHBaseRootAddress=U] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); System.err.println(" without endtime means from starttime to forever"); System.err.println(" endtime end of the time range"); System.err.println(" versions number of cell versions to verify"); - System.err.println(" batch batch count for scan, " + - "note that result row counts will no longer be actual number of rows when you use this option"); + System.err.println(" batch batch count for scan, note that result row counts will no " + + "longer be actual number of rows when you use this option"); System.err.println(" raw includes raw scan if given in options"); System.err.println(" families comma-separated list of families to copy"); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); @@ -685,11 +717,15 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); System.err.println(); System.err.println("Args:"); - System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); + System.err.println(" peerid Id of the peer used for verification, must match the one " + + "given for replication"); + System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " + + "format is zk_quorum:zk_port:zk_hbase_path"); System.err.println(" tablename Name of the table to verify"); System.err.println(); System.err.println("Examples:"); - System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); + System.err.println(" To verify the data replicated from TestTable for a 1 hour window with " + + "peer #5 "); System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index e1fda4eb0b..657696f6d4 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -496,4 +496,29 @@ public class TestVerifyReplication extends TestReplicationBase { checkRestoreTmpDir(conf1, temPath1, 2); checkRestoreTmpDir(conf2, temPath2, 2); } + + @Test + public void testVerifyRepJobWithQuorumAddress() throws Exception { + // Populate the tables, at the same time it guarantees that the tables are + // identical since it does the check + runSmallBatchTest(); + + // with a quorum address (a cluster key) + String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() }; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); + + Scan scan = new Scan(); + ResultScanner rs = htable2.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + Cell firstVal = result.rawCells()[0]; + put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), + Bytes.toBytes("diff data")); + htable2.put(put); + } + Delete delete = new Delete(put.getRow()); + htable2.delete(delete); + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + } } -- 2.20.1