From 56754a8b76b6ca5c444430ac935dc5d73fb4774e Mon Sep 17 00:00:00 2001 From: Sukumar Maddineni Date: Fri, 28 Apr 2017 13:32:44 -0700 Subject: [PATCH] #HBASE-16466 - Snapshots support in VerifyReplication tool --- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 34 ++++ .../hbase/mapreduce/TableSnapshotInputFormat.java | 4 + .../mapreduce/replication/VerifyReplication.java | 202 +++++++++++++++++---- .../replication/TestReplicationSmallTests.java | 126 ++++++++++++- 4 files changed, 322 insertions(+), 44 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index e6a69ac..fa29c43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -43,10 +43,12 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -796,6 +798,38 @@ public class TableMapReduceUtil { org.apache.htrace.Trace.class, com.codahale.metrics.MetricRegistry.class); } + + + /** + * Util to add a table related coprocessors classes to M/R job classpath + * @param conf + * @param tableName + * @throws IOException + */ + public static void addTableCoprocessorJarsToClasspath(Configuration conf, TableName tableName) + throws IOException { + ClusterConnection connection = null; + Table table = null; + try { + connection = (ClusterConnection) ConnectionFactory.createConnection(conf); + table = connection.getTable(tableName); + List coprocessorClasses = table.getTableDescriptor().getCoprocessors(); + if (coprocessorClasses != null && coprocessorClasses.size() > 0) { + Class[] classes = new Class[coprocessorClasses.size()]; + int i = 0; + for (String coprocessor : coprocessorClasses) { + LOG.debug("Adding coprocessor " + coprocessor + " to classpath"); + classes[i++] = Class.forName(coprocessor); + } + addDependencyJarsForClasses(conf, classes); + } + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + if (table != null) table.close(); + if (connection != null) connection.close(); + } + } /** * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index 15d403f..8c06345 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -120,6 +120,10 @@ public class TableSnapshotInputFormat extends InputFormat getPeerQuorumConfig( - final Configuration conf) throws IOException { + final Configuration conf, String peerId) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { @@ -322,7 +369,7 @@ public class VerifyReplication extends Configured implements Tool { * @return The newly created job. * @throws java.io.IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) + public Job createSubmittableJob(Configuration conf, String[] args) throws IOException { if (!doCommandLine(args)) { return null; @@ -343,7 +390,7 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".rowPrefixes", rowPrefixes); } - Pair peerConfigPair = getPeerQuorumConfig(conf); + Pair peerConfigPair = getPeerQuorumConfig(conf, peerId); ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); String peerQuorumAddress = peerConfig.getClusterKey(); LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + @@ -354,6 +401,18 @@ public class VerifyReplication extends Configured implements Tool { conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); + + //Set Snapshot specific parameters + if (peerSnapshotName != null) { + conf.set(NAME + ".peerSnapshotName", peerSnapshotName); + conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); + conf.set(NAME + ".peerFSAddress", peerFSAddress); + conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); + + // This is to create HDFS delegation token for peer cluster in case of secured + conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress); + } + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); @@ -378,8 +437,17 @@ public class VerifyReplication extends Configured implements Tool { setRowPrefixFilter(scan, rowPrefixes); - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Verifier.class, null, null, job); + if (sourceSnapshotName != null) { + Path snapshotTempPath = new Path(sourceSnapshotTmpDir); + LOG.info( + "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); + TableMapReduceUtil.addTableCoprocessorJarsToClasspath(job.getConfiguration(), + TableName.valueOf(tableName)); + TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, + null, job, true, snapshotTempPath); + } else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + } Configuration peerClusterConf = peerConfigPair.getSecond(); // Obtain the auth token from peer cluster @@ -413,15 +481,12 @@ public class VerifyReplication extends Configured implements Tool { scan.setStopRow(stopRow); } - private static boolean doCommandLine(final String[] args) { + @VisibleForTesting + public boolean doCommandLine(final String[] args) { if (args.length < 2) { printUsage(null); return false; } - //in case we've been run before, restore all parameters to their initial states - //Otherwise, if our previous run included a parameter not in args this time, - //we might hold on to the old value. - restoreDefaults(); try { for (int i = 0; i < args.length; i++) { String cmd = args[i]; @@ -487,10 +552,47 @@ public class VerifyReplication extends Configured implements Tool { if (cmd.startsWith(verboseKey)) { verbose = true; continue; - } + } + + final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; + if (cmd.startsWith(sourceSnapshotNameArgKey)) { + sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); + continue; + } + + final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; + if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { + sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); + continue; + } + + final String peerSnapshotNameArgKey = "--peerSnapshotName="; + if (cmd.startsWith(peerSnapshotNameArgKey)) { + peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); + continue; + } + + final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; + if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { + peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); + continue; + } + + final String peerFSAddressArgKey = "--peerFSAddress="; + if (cmd.startsWith(peerFSAddressArgKey)) { + peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); + continue; + } + + final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; + if (cmd.startsWith(peerHBaseRootAddressArgKey)) { + peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); + continue; + } if (cmd.startsWith("--")) { printUsage("Invalid argument '" + cmd + "'"); + return false; } if (i == args.length-2) { @@ -501,6 +603,33 @@ public class VerifyReplication extends Configured implements Tool { tableName = cmd; } } + + + if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) + || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { + printUsage("Source snapshot name and snapshot temp location should be provided" + + " to use snapshots in source cluster"); + return false; + } + + if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null + || peerHBaseRootAddress != null) { + if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null + || peerHBaseRootAddress == null) { + printUsage( + "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " + + "peer FSAddress should be provided to use snapshots in peer cluster"); + return false; + } + } + + // This is to avoid making recompare calls to source/peer tables when snapshots are used + if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { + printUsage( + "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); + return false; + } + } catch (Exception e) { e.printStackTrace(); printUsage("Can't start because " + e.getMessage()); @@ -509,18 +638,6 @@ public class VerifyReplication extends Configured implements Tool { return true; } - private static void restoreDefaults() { - startTime = 0; - endTime = Long.MAX_VALUE; - batch = -1; - versions = -1; - tableName = null; - families = null; - peerId = null; - rowPrefixes = null; - includeDeletedCells = false; - } - /* * @param errorMsg Error message. Can be null. */ @@ -530,7 +647,8 @@ public class VerifyReplication extends Configured implements Tool { } System.err.println("Usage: verifyrep [--starttime=X]" + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--batch=] [--verbose] "); + "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " + + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -546,6 +664,12 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); System.err.println(" verbose logs row keys of good rows"); + System.err.println(" sourceSnapshotName Source Snapshot Name"); + System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); + System.err.println(" peerSnapshotName Peer Snapshot Name"); + System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); + System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); + System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 1c5a994..adcd54e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +30,8 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -59,10 +58,12 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; @@ -73,6 +74,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import com.google.common.collect.Lists; + @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSmallTests extends TestReplicationBase { @@ -593,7 +596,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) throws IOException, InterruptedException, ClassNotFoundException { - Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args); + Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args); if (job == null) { fail("Job wasn't created, see the log"); } @@ -863,5 +866,118 @@ public class TestReplicationSmallTests extends TestReplicationBase { tableName.getNameAsString()}; runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0); } + + @Test(timeout = 300000) + public void testVerifyReplicationSnapshotArguments() { + String[] args = + new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2", + tableName.getNameAsString() }; + assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/", + "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", + tableName.getNameAsString() }; + assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/", + "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs", + "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() }; + + assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args)); + } + + @Test(timeout = 300000) + public void testVerifyReplicationWithSnapshotSupport() throws Exception { + // Populate the tables, at the same time it guarantees that the tables are + // identical since it does the check + testSmallBatch(); + + // Take source and target tables snapshot + Path rootDir = FSUtils.getRootDir(conf1); + FileSystem fs = rootDir.getFileSystem(conf1); + String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, + new String(famName), sourceSnapshotName, rootDir, fs, true); + + // Take target snapshot + Path peerRootDir = FSUtils.getRootDir(conf2); + FileSystem peerFs = peerRootDir.getFileSystem(conf2); + String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, + new String(famName), peerSnapshotName, peerRootDir, peerFs, true); + + String peerFSAddress = peerFs.getUri().toString(); + String temPath1 = utility1.getRandomDir().toString(); + String temPath2 = "/tmp2"; + + String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; + + Job job = new VerifyReplication().createSubmittableJob(conf1, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(NB_ROWS_IN_BATCH, + job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(0, + job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + + Scan scan = new Scan(); + ResultScanner rs = htable2.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + Cell firstVal = result.rawCells()[0]; + put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), + Bytes.toBytes("diff data")); + htable2.put(put); + } + Delete delete = new Delete(put.getRow()); + htable2.delete(delete); + + sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, + new String(famName), sourceSnapshotName, rootDir, fs, true); + + peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, + new String(famName), peerSnapshotName, peerRootDir, peerFs, true); + + args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; + + job = new VerifyReplication().createSubmittableJob(conf1, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(0, + job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(NB_ROWS_IN_BATCH, + job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + + } + } -- 2.0.1