Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1028470)
+++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy)
@@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -41,10 +42,11 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -455,6 +457,56 @@
}
/**
+ * Do a small loading into a table, make sure the data is really the same,
+ * then run the VerifyReplication job to check the results. Do a second
+ * comparison where all the cells are different.
+ * @throws Exception
+ */
+ @Test
+ public void testVerifyRepJob() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ testSmallBatch();
+
+ String[] args = new String[] {"2", Bytes.toString(tableName)};
+ Job job = VerifyReplication.createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(0, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ KeyValue firstVal = result.raw()[0];
+ put.add(firstVal.getFamily(),
+ firstVal.getQualifier(), Bytes.toBytes("diff data"));
+ htable2.put(put);
+ }
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
+ job = VerifyReplication.createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(0, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ }
+
+ /**
* Load up multiple tables over 2 region servers and kill a source during
* the upload. The failover happens internally.
* @throws Exception
Index: src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 0)
@@ -0,0 +1,280 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.replication;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This map-only job compares the data from a local table with a remote one.
+ * Every cell is compared and must have exactly the same keys (even timestamp)
+ * as well as same value. It is possible to restrict the job by time range and
+ * families. The peer id that's provided must match the one given when the
+ * replication stream was setup.
+ *
+ * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
+ * for a why a row is different is shown in the map's log.
+ */
+public class VerifyReplication {
+
+ private static final Log LOG =
+ LogFactory.getLog(VerifyReplication.class);
+
+ public final static String NAME = "verifyrep";
+ static long startTime = 0;
+ static long endTime = 0;
+ static String tableName = null;
+ static String families = null;
+ static String peerId = null;
+
+ /**
+ * Map-only comparator for 2 tables
+ */
+ public static class Verifier
+ extends TableMapper {
+
+ public static enum Counters {GOODROWS, BADROWS}
+
+ private ResultScanner replicatedScanner;
+
+ /**
+ * Map method that compares every scanned row with the equivalent from
+ * a distant cluster.
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ */
+ @Override
+ public void map(ImmutableBytesWritable row, Result value,
+ Context context)
+ throws IOException {
+ if (replicatedScanner == null) {
+ Configuration conf = context.getConfiguration();
+ Scan scan = new Scan();
+ scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
+ long startTime = conf.getLong(NAME + ".startTime", 0);
+ long endTime = conf.getLong(NAME + ".endTime", 0);
+ String families = conf.get(NAME + ".families", null);
+ if(families != null) {
+ String[] fams = families.split(",");
+ for(String fam : fams) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ if (startTime != 0) {
+ scan.setTimeRange(startTime,
+ endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conf,
+ HConnectionManager.getConnection(conf).
+ getZooKeeperWatcher());
+ ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
+ HTable replicatedTable = new HTable(peer.getConfiguration(),
+ conf.get(NAME+".tableName"));
+ scan.setStartRow(value.getRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
+ } catch (KeeperException e) {
+ throw new IOException("Got a ZK exception", e);
+ }
+ }
+ Result res = replicatedScanner.next();
+ try {
+ Result.compareResults(value, res);
+ context.getCounter(Counters.GOODROWS).increment(1);
+ } catch (Exception e) {
+ LOG.warn("Bad row", e);
+ context.getCounter(Counters.BADROWS).increment(1);
+ }
+ }
+
+ protected void cleanup(Context context) {
+ replicatedScanner.close();
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws java.io.IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ if (!doCommandLine(args)) {
+ return null;
+ }
+ if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+ throw new IOException("Replication needs to be enabled to verify it.");
+ }
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conf,
+ HConnectionManager.getConnection(conf).getZooKeeperWatcher());
+ // Just verifying it we can connect
+ ReplicationPeer peer = zk.getPeer(peerId);
+ if (peer == null) {
+ throw new IOException("Couldn't get access to the slave cluster," +
+ "please see the log");
+ }
+ } catch (KeeperException ex) {
+ throw new IOException("Couldn't get access to the slave cluster" +
+ " because: ", ex);
+ }
+ conf.set(NAME+".peerId", peerId);
+ conf.set(NAME+".tableName", tableName);
+ conf.setLong(NAME+".startTime", startTime);
+ conf.setLong(NAME+".endTime", endTime);
+ if (families != null) {
+ conf.set(NAME+".families", families);
+ }
+ Job job = new Job(conf, NAME + "_" + tableName);
+ job.setJarByClass(VerifyReplication.class);
+
+ Scan scan = new Scan();
+ if (startTime != 0) {
+ scan.setTimeRange(startTime,
+ endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ if(families != null) {
+ String[] fams = families.split(",");
+ for(String fam : fams) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ TableMapReduceUtil.initTableMapperJob(tableName, scan,
+ Verifier.class, null, null, job);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ private static boolean doCommandLine(final String[] args) {
+ if (args.length < 2) {
+ printUsage(null);
+ return false;
+ }
+ try {
+ for (int i = 0; i < args.length; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String startTimeArgKey = "--starttime=";
+ if (cmd.startsWith(startTimeArgKey)) {
+ startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+ continue;
+ }
+
+ final String endTimeArgKey = "--endtime=";
+ if (cmd.startsWith(endTimeArgKey)) {
+ endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+ continue;
+ }
+
+ final String familiesArgKey = "--families=";
+ if (cmd.startsWith(familiesArgKey)) {
+ families = cmd.substring(familiesArgKey.length());
+ continue;
+ }
+
+ if (i == args.length-2) {
+ peerId = cmd;
+ }
+
+ if (i == args.length-1) {
+ tableName = cmd;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: verifyrep [--starttime=X]" +
+ " [--stoptime=Y] [--families=A] ");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" starttime beginning of the time range");
+ System.err.println(" without endtime means from starttime to forever");
+ System.err.println(" stoptime end of the time range");
+ System.err.println(" families comma-separated list of families to copy");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
+ System.err.println(" tablename Name of the table to verify");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
+ System.err.println(" $ bin/hbase " +
+ "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
+ " --starttime=1265875194289 --stoptime=1265878794289 5 TestTable ");
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ Job job = createSubmittableJob(conf, args);
+ if (job != null) {
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (revision 1028470)
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (working copy)
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.util.ProgramDriver;
/**
@@ -41,6 +42,10 @@
"Complete a bulk data load.");
pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster");
+ pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" +
+ " the data from tables in two different clusters. WARNING: It" +
+ " doesn't work for incrementColumnValues'd cells since the" +
+ " timestamp is changed after being appended to the log.");
pgd.driver(args);
}
}
Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1028470)
+++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy)
@@ -265,7 +265,7 @@
* @throws IOException
* @throws KeeperException
*/
- private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
+ public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String otherClusterKey = Bytes.toString(data);
Index: src/main/java/org/apache/hadoop/hbase/client/Result.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/Result.java (revision 1028470)
+++ src/main/java/org/apache/hadoop/hbase/client/Result.java (working copy)
@@ -615,4 +615,31 @@
}
return results;
}
+
+ /**
+ * Does a deep comparison of two Results, down to the byte arrays.
+ * @param res1 first result to compare
+ * @param res2 second result to compare
+ * @throws Exception Every difference is throwing an exception
+ */
+ public static void compareResults(Result res1, Result res2)
+ throws Exception {
+ if (res2 == null) {
+ throw new Exception("There wasn't enough rows, we stopped at "
+ + Bytes.toString(res1.getRow()));
+ }
+ if (res1.size() != res2.size()) {
+ throw new Exception("This row doesn't have the same number of KVs: "
+ + res1.toString() + " compared to " + res2.toString());
+ }
+ KeyValue[] ourKVs = res1.sorted();
+ KeyValue[] replicatedKVs = res2.sorted();
+ for (int i = 0; i < res1.size(); i++) {
+ if (!ourKVs[i].equals(replicatedKVs[i]) &&
+ !Bytes.equals(ourKVs[i].getValue(), replicatedKVs[i].getValue())) {
+ throw new Exception("This result was different: "
+ + res1.toString() + " compared to " + res2.toString());
+ }
+ }
+ }
}
Index: src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
===================================================================
--- src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (revision 1028470)
+++ src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (working copy)
@@ -31,6 +31,7 @@
Status
Requirements
Deployment
+ Verifying Replicated Data
@@ -49,6 +50,7 @@
Supports clusters of different sizes.
Handling of partitions longer than 10 minutes.
Ability to add/remove slave clusters at runtime.
+ MapReduce job to compare tables on two clusters
Please report bugs on the project's Jira when found.
@@ -122,5 +124,19 @@
+
+Verifying Replicated Data
+
+
+Verifying the replicated data on two clusters is easy to do in the shell when
+looking only at a few rows, but doing a systematic comparison requires more
+computing power. This is why the VerifyReplication MR job was created, it has
+to be run on the master cluster and needs to be provided with a peer id (the
+one provided when establishing a replication stream) and a table name. Other
+options let you specify a time range and specific families. This job's short
+name is "verifyrep" and needs to be provided when pointing "hadoop jar" to the
+hbase jar.
+
+