From 66f7076eca15671af310b9c102bee50e1d5bfe6d Mon Sep 17 00:00:00 2001 From: Sukumar Maddineni Date: Thu, 27 Apr 2017 16:14:31 -0700 Subject: [PATCH] #HBASE-16466 - Snapshot support(Source side and peer side) in VerifyReplication tool --- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 41 +++++ .../hbase/mapreduce/TableSnapshotInputFormat.java | 5 + .../mapreduce/replication/VerifyReplication.java | 199 +++++++++++++++++---- .../replication/TestReplicationSmallTests.java | 133 +++++++++++++- 4 files changed, 334 insertions(+), 44 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index e6a69ac..b131c81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -43,10 +43,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -796,6 +799,44 @@ public class TableMapReduceUtil { org.apache.htrace.Trace.class, com.codahale.metrics.MetricRegistry.class); } + + + /** + * Util to add a table related coprocessors classes to M/R job classpath + * @param conf + * @param tableName + * @throws IOException + */ + public static void addTableCoprocessorJarsToClasspath(Configuration conf, + TableName tableName) throws IOException + { + ClusterConnection connection = null; + Table table = null; + try { + connection = (ClusterConnection) ConnectionFactory.createConnection(conf); + table = connection.getTable(tableName); + List coprocessorClasses = table.getTableDescriptor() + .getCoprocessors(); + if (coprocessorClasses != null && coprocessorClasses.size() > 0) { + Class[] classes = new Class[coprocessorClasses.size()]; + int i = 0; + for (String coprocessor : coprocessorClasses) { + LOG.debug("Adding coprocessor "+coprocessor+" to classpath"); + classes[i++] = Class.forName(coprocessor); + } + addDependencyJarsForClasses(conf, classes); + } + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + finally + { + if (table != null) + table.close(); + if(connection!=null) + connection.close(); + } + } /** * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index 15d403f..aa8c584 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -120,6 +120,11 @@ public class TableSnapshotInputFormat extends InputFormat getPeerQuorumConfig( - final Configuration conf) throws IOException { + final Configuration conf, String peerId) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { @@ -322,7 +362,7 @@ public class VerifyReplication extends Configured implements Tool { * @return The newly created job. * @throws java.io.IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) + public Job createSubmittableJob(Configuration conf, String[] args) throws IOException { if (!doCommandLine(args)) { return null; @@ -343,7 +383,7 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".rowPrefixes", rowPrefixes); } - Pair peerConfigPair = getPeerQuorumConfig(conf); + Pair peerConfigPair = getPeerQuorumConfig(conf, peerId); ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); String peerQuorumAddress = peerConfig.getClusterKey(); LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + @@ -354,6 +394,19 @@ public class VerifyReplication extends Configured implements Tool { conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); + + //Set Snapshot specific parameters + if(peerSnapshotName!=null) + { + conf.set(NAME+".peerSnapshotName", peerSnapshotName); + conf.set(NAME+".peerSnapshotTmpDir", peerSnapshotTmpDir); + conf.set(NAME+".peerFSAddress", peerFSAddress); + conf.set(NAME+".peerHBaseRootAddress", peerHBaseRootAddress); + + //This is to create HDFS delegation token for peer cluster in case of secured + conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress); + } + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); @@ -378,8 +431,20 @@ public class VerifyReplication extends Configured implements Tool { setRowPrefixFilter(scan, rowPrefixes); - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Verifier.class, null, null, job); + if(sourceSnapshotName!=null) + { + Path snapshotTempPath = new Path(sourceSnapshotTmpDir); + LOG.info("Using source snapshot-"+sourceSnapshotName +" with temp dir:"+sourceSnapshotTmpDir); + //deleteDirectories(conf, snapshotTempPath); + TableMapReduceUtil.addTableCoprocessorJarsToClasspath(job.getConfiguration(), TableName.valueOf(tableName)); + TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, + scan, Verifier.class, null, null, job, true, snapshotTempPath); + } + else + { + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Verifier.class, null, null, job); + } Configuration peerClusterConf = peerConfigPair.getSecond(); // Obtain the auth token from peer cluster @@ -413,15 +478,12 @@ public class VerifyReplication extends Configured implements Tool { scan.setStopRow(stopRow); } - private static boolean doCommandLine(final String[] args) { + @VisibleForTesting + public boolean doCommandLine(final String[] args) { if (args.length < 2) { printUsage(null); return false; } - //in case we've been run before, restore all parameters to their initial states - //Otherwise, if our previous run included a parameter not in args this time, - //we might hold on to the old value. - restoreDefaults(); try { for (int i = 0; i < args.length; i++) { String cmd = args[i]; @@ -487,10 +549,47 @@ public class VerifyReplication extends Configured implements Tool { if (cmd.startsWith(verboseKey)) { verbose = true; continue; - } + } + + final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; + if (cmd.startsWith(sourceSnapshotNameArgKey)) { + sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); + continue; + } + + final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; + if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { + sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); + continue; + } + + final String peerSnapshotNameArgKey = "--peerSnapshotName="; + if (cmd.startsWith(peerSnapshotNameArgKey)) { + peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); + continue; + } + + final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; + if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { + peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); + continue; + } + + final String peerFSAddressArgKey = "--peerFSAddress="; + if (cmd.startsWith(peerFSAddressArgKey)) { + peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); + continue; + } + + final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; + if (cmd.startsWith(peerHBaseRootAddressArgKey)) { + peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); + continue; + } if (cmd.startsWith("--")) { printUsage("Invalid argument '" + cmd + "'"); + return false; } if (i == args.length-2) { @@ -501,6 +600,33 @@ public class VerifyReplication extends Configured implements Tool { tableName = cmd; } } + + + if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) + || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { + printUsage( + "Source snapshot name and snapshot temp location should be provided to use snapshots in source cluster"); + return false; + } + + if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null + || peerHBaseRootAddress != null) { + if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null + || peerHBaseRootAddress == null) { + printUsage( + "Peer snapshot name, peer snapshot temp location, Peer HBase root address and peer FSAddress should be provided to use snapshots in peer cluster"); + return false; + } + } + + //This is to avoid making recompare calls to source/peer tables when snapshots is used + if((sourceSnapshotName!=null || peerSnapshotName!=null) && sleepMsBeforeReCompare>0) + { + printUsage( + "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); + return false; + } + } catch (Exception e) { e.printStackTrace(); printUsage("Can't start because " + e.getMessage()); @@ -509,18 +635,6 @@ public class VerifyReplication extends Configured implements Tool { return true; } - private static void restoreDefaults() { - startTime = 0; - endTime = Long.MAX_VALUE; - batch = -1; - versions = -1; - tableName = null; - families = null; - peerId = null; - rowPrefixes = null; - includeDeletedCells = false; - } - /* * @param errorMsg Error message. Can be null. */ @@ -530,7 +644,8 @@ public class VerifyReplication extends Configured implements Tool { } System.err.println("Usage: verifyrep [--starttime=X]" + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--batch=] [--verbose] "); + "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " + + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -546,6 +661,12 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); System.err.println(" verbose logs row keys of good rows"); + System.err.println(" sourceSnapshotName Source Snapshot Name"); + System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); + System.err.println(" peerSnapshotName Peer Snapshot Name"); + System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); + System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); + System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 1c5a994..21418db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +30,8 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -59,10 +58,12 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; @@ -73,6 +74,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import com.google.common.collect.Lists; + @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSmallTests extends TestReplicationBase { @@ -593,7 +596,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) throws IOException, InterruptedException, ClassNotFoundException { - Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args); + Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args); if (job == null) { fail("Job wasn't created, see the log"); } @@ -863,5 +866,125 @@ public class TestReplicationSmallTests extends TestReplicationBase { tableName.getNameAsString()}; runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0); } + + @Test(timeout = 300000) + public void testVerifyReplicationSnapshotArguments() + { + String[] args = new String[] { "--sourceSnapshotName=snapshot1", + "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--sourceSnapshotTmpDir=tmp", + "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--sourceSnapshotName=snapshot1", + "--sourceSnapshotTmpDir=tmp", + "2", tableName.getNameAsString() }; + assertTrue(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--peerSnapshotName=snapshot1", + "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--peerSnapshotTmpDir=/tmp/", + "2", tableName.getNameAsString() }; + assertFalse(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--peerSnapshotName=snapshot1", + "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs","--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", + "2", tableName.getNameAsString() }; + assertTrue(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + + args = new String[] { "--sourceSnapshotName=snapshot1", + "--sourceSnapshotTmpDir=/tmp/", "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", + "--peerFSAddress=tempfs","--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", + "2", tableName.getNameAsString() }; + + assertTrue(Lists.newArrayList(args).toString(),new VerifyReplication().doCommandLine(args)); + } + + @Test(timeout = 300000) + public void testVerifyReplicationWithSnapshotSupport() throws Exception { + // Populate the tables, at the same time it guarantees that the tables are + // identical since it does the check + testSmallBatch(); + + //Take source and target tables snapshot + Path rootDir = FSUtils.getRootDir(conf1); + FileSystem fs = rootDir.getFileSystem(conf1); + String sourceSnapshotName = "sourceSnapshot-"+System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, + new String(famName), sourceSnapshotName, rootDir, fs, true); + + //Take target snapshot + Path peerRootDir = FSUtils.getRootDir(conf2); + FileSystem peerFs = peerRootDir.getFileSystem(conf2); + String peerSnapshotName = "peerSnapshot-"+System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, + new String(famName), peerSnapshotName, peerRootDir, peerFs, true); + + String peerFSAddress = peerFs.getUri().toString(); + String temPath1 = utility1.getRandomDir().toString(); + String temPath2 = "/tmp2"; + + String[] args = new String[] { "--sourceSnapshotName="+sourceSnapshotName, + "--sourceSnapshotTmpDir="+temPath1, "--peerSnapshotName="+peerSnapshotName, "--peerSnapshotTmpDir="+temPath2, + "--peerFSAddress="+peerFSAddress, "--peerHBaseRootAddress="+FSUtils.getRootDir(conf2), + "2", tableName.getNameAsString() }; + + Job job = new VerifyReplication().createSubmittableJob(conf1, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + + Scan scan = new Scan(); + ResultScanner rs = htable2.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + Cell firstVal = result.rawCells()[0]; + put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), + Bytes.toBytes("diff data")); + htable2.put(put); + } + Delete delete = new Delete(put.getRow()); + htable2.delete(delete); + + sourceSnapshotName = "sourceSnapshot-"+System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName, + new String(famName), sourceSnapshotName, rootDir, fs, true); + + peerSnapshotName = "peerSnapshot-"+System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName, + new String(famName), peerSnapshotName, peerRootDir, peerFs, true); + + args = new String[] { "--sourceSnapshotName="+sourceSnapshotName, + "--sourceSnapshotTmpDir="+temPath1, "--peerSnapshotName="+peerSnapshotName, "--peerSnapshotTmpDir="+temPath2, + "--peerFSAddress="+peerFSAddress, "--peerHBaseRootAddress="+FSUtils.getRootDir(conf2), + "2", tableName.getNameAsString() }; + + job = new VerifyReplication().createSubmittableJob(conf1, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + + + } + } -- 2.0.1