Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1028470) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -41,10 +42,11 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.mapreduce.Job; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -455,6 +457,56 @@ } /** + * Do a small loading into a table, make sure the data is really the same, + * then run the VerifyReplication job to check the results. Do a second + * comparison where all the cells are different. + * @throws Exception + */ + @Test + public void testVerifyRepJob() throws Exception { + // Populate the tables, at the same time it guarantees that the tables are + // identical since it does the check + testSmallBatch(); + + String[] args = new String[] {"2", Bytes.toString(tableName)}; + Job job = VerifyReplication.createSubmittableJob(conf1, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + + Scan scan = new Scan(); + ResultScanner rs = htable2.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + KeyValue firstVal = result.raw()[0]; + put.add(firstVal.getFamily(), + firstVal.getQualifier(), Bytes.toBytes("diff data")); + htable2.put(put); + } + Delete delete = new Delete(put.getRow()); + htable2.delete(delete); + job = VerifyReplication.createSubmittableJob(conf1, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + } + + /** * Load up multiple tables over 2 region servers and kill a source during * the upload. The failover happens internally. * @throws Exception Index: src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 0) @@ -0,0 +1,280 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.zookeeper.KeeperException; + +/** + * This map-only job compares the data from a local table with a remote one. + * Every cell is compared and must have exactly the same keys (even timestamp) + * as well as same value. It is possible to restrict the job by time range and + * families. The peer id that's provided must match the one given when the + * replication stream was setup. + *

+ * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason + * for a why a row is different is shown in the map's log. + */ +public class VerifyReplication { + + private static final Log LOG = + LogFactory.getLog(VerifyReplication.class); + + public final static String NAME = "verifyrep"; + static long startTime = 0; + static long endTime = 0; + static String tableName = null; + static String families = null; + static String peerId = null; + + /** + * Map-only comparator for 2 tables + */ + public static class Verifier + extends TableMapper { + + public static enum Counters {GOODROWS, BADROWS} + + private ResultScanner replicatedScanner; + + /** + * Map method that compares every scanned row with the equivalent from + * a distant cluster. + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + if (replicatedScanner == null) { + Configuration conf = context.getConfiguration(); + Scan scan = new Scan(); + scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); + long startTime = conf.getLong(NAME + ".startTime", 0); + long endTime = conf.getLong(NAME + ".endTime", 0); + String families = conf.get(NAME + ".families", null); + if(families != null) { + String[] fams = families.split(","); + for(String fam : fams) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + if (startTime != 0) { + scan.setTimeRange(startTime, + endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + try { + ReplicationZookeeper zk = new ReplicationZookeeper(conf, + HConnectionManager.getConnection(conf). + getZooKeeperWatcher()); + ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); + HTable replicatedTable = new HTable(peer.getConfiguration(), + conf.get(NAME+".tableName")); + scan.setStartRow(value.getRow()); + replicatedScanner = replicatedTable.getScanner(scan); + } catch (KeeperException e) { + throw new IOException("Got a ZK exception", e); + } + } + Result res = replicatedScanner.next(); + try { + Result.compareResults(value, res); + context.getCounter(Counters.GOODROWS).increment(1); + } catch (Exception e) { + LOG.warn("Bad row", e); + context.getCounter(Counters.BADROWS).increment(1); + } + } + + protected void cleanup(Context context) { + replicatedScanner.close(); + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws java.io.IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + if (!doCommandLine(args)) { + return null; + } + if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { + throw new IOException("Replication needs to be enabled to verify it."); + } + try { + ReplicationZookeeper zk = new ReplicationZookeeper(conf, + HConnectionManager.getConnection(conf).getZooKeeperWatcher()); + // Just verifying it we can connect + ReplicationPeer peer = zk.getPeer(peerId); + if (peer == null) { + throw new IOException("Couldn't get access to the slave cluster," + + "please see the log"); + } + } catch (KeeperException ex) { + throw new IOException("Couldn't get access to the slave cluster" + + " because: ", ex); + } + conf.set(NAME+".peerId", peerId); + conf.set(NAME+".tableName", tableName); + conf.setLong(NAME+".startTime", startTime); + conf.setLong(NAME+".endTime", endTime); + if (families != null) { + conf.set(NAME+".families", families); + } + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(VerifyReplication.class); + + Scan scan = new Scan(); + if (startTime != 0) { + scan.setTimeRange(startTime, + endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + if(families != null) { + String[] fams = families.split(","); + for(String fam : fams) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Verifier.class, null, null, job); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + return job; + } + + private static boolean doCommandLine(final String[] args) { + if (args.length < 2) { + printUsage(null); + return false; + } + try { + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + families = cmd.substring(familiesArgKey.length()); + continue; + } + + if (i == args.length-2) { + peerId = cmd; + } + + if (i == args.length-1) { + tableName = cmd; + } + } + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: verifyrep [--starttime=X]" + + " [--stoptime=Y] [--families=A] "); + System.err.println(); + System.err.println("Options:"); + System.err.println(" starttime beginning of the time range"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" stoptime end of the time range"); + System.err.println(" families comma-separated list of families to copy"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); + System.err.println(" tablename Name of the table to verify"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); + System.err.println(" $ bin/hbase " + + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + + " --starttime=1265875194289 --stoptime=1265878794289 5 TestTable "); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + Job job = createSubmittableJob(conf, args); + if (job != null) { + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.util.ProgramDriver; /** @@ -41,6 +42,10 @@ "Complete a bulk data load."); pgd.addClass(CopyTable.NAME, CopyTable.class, "Export a table from local cluster to peer cluster"); + pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" + + " the data from tables in two different clusters. WARNING: It" + + " doesn't work for incrementColumnValues'd cells since the" + + " timestamp is changed after being appended to the log."); pgd.driver(args); } } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -265,7 +265,7 @@ * @throws IOException * @throws KeeperException */ - private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{ + public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{ String znode = ZKUtil.joinZNode(this.peersZNode, peerId); byte [] data = ZKUtil.getData(this.zookeeper, znode); String otherClusterKey = Bytes.toString(data); Index: src/main/java/org/apache/hadoop/hbase/client/Result.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Result.java (revision 1028470) +++ src/main/java/org/apache/hadoop/hbase/client/Result.java (working copy) @@ -615,4 +615,31 @@ } return results; } + + /** + * Does a deep comparison of two Results, down to the byte arrays. + * @param res1 first result to compare + * @param res2 second result to compare + * @throws Exception Every difference is throwing an exception + */ + public static void compareResults(Result res1, Result res2) + throws Exception { + if (res2 == null) { + throw new Exception("There wasn't enough rows, we stopped at " + + Bytes.toString(res1.getRow())); + } + if (res1.size() != res2.size()) { + throw new Exception("This row doesn't have the same number of KVs: " + + res1.toString() + " compared to " + res2.toString()); + } + KeyValue[] ourKVs = res1.sorted(); + KeyValue[] replicatedKVs = res2.sorted(); + for (int i = 0; i < res1.size(); i++) { + if (!ourKVs[i].equals(replicatedKVs[i]) && + !Bytes.equals(ourKVs[i].getValue(), replicatedKVs[i].getValue())) { + throw new Exception("This result was different: " + + res1.toString() + " compared to " + res2.toString()); + } + } + } } Index: src/main/javadoc/org/apache/hadoop/hbase/replication/package.html =================================================================== --- src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (revision 1028470) +++ src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (working copy) @@ -31,6 +31,7 @@

  • Status
  • Requirements
  • Deployment
  • +
  • Verifying Replicated Data
  • @@ -49,6 +50,7 @@

  • Supports clusters of different sizes.
  • Handling of partitions longer than 10 minutes.
  • Ability to add/remove slave clusters at runtime.
  • +
  • MapReduce job to compare tables on two clusters
  • Please report bugs on the project's Jira when found.

    @@ -122,5 +124,19 @@

    + +

    Verifying Replicated Data

    + +

    +Verifying the replicated data on two clusters is easy to do in the shell when +looking only at a few rows, but doing a systematic comparison requires more +computing power. This is why the VerifyReplication MR job was created, it has +to be run on the master cluster and needs to be provided with a peer id (the +one provided when establishing a replication stream) and a table name. Other +options let you specify a time range and specific families. This job's short +name is "verifyrep" and needs to be provided when pointing "hadoop jar" to the +hbase jar. +

    +