From 8b9a6815b7b0e7bf0c45903888d848b1f9456c31 Mon Sep 17 00:00:00 2001 From: rahulgidwani Date: Mon, 13 Apr 2015 21:43:51 -0700 Subject: [PATCH] HBASE-13459 A more robust Verify Replication --- .../org/apache/hadoop/hbase/CellComparator.java | 57 ++- .../hbase/mapreduce/replication/CompareTables.java | 449 +++++++++++++++++++++ 2 files changed, 484 insertions(+), 22 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/CompareTables.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index 992079d..c8640b4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -54,9 +54,7 @@ public class CellComparator implements Comparator, Serializable{ public static int compareStatic(Cell a, Cell b, boolean onlyKey) { //row - int c = Bytes.compareTo( - a.getRowArray(), a.getRowOffset(), a.getRowLength(), - b.getRowArray(), b.getRowOffset(), b.getRowLength()); + int c = compareRow(a, b); if (c != 0) return c; // If the column is not specified, the "minimum" key type appears the @@ -73,23 +71,19 @@ public class CellComparator implements Comparator, Serializable{ } //family - c = Bytes.compareTo( - a.getFamilyArray(), a.getFamilyOffset(), a.getFamilyLength(), - b.getFamilyArray(), b.getFamilyOffset(), b.getFamilyLength()); + c = compareFamily(a, b); if (c != 0) return c; //qualifier - c = Bytes.compareTo( - a.getQualifierArray(), a.getQualifierOffset(), a.getQualifierLength(), - b.getQualifierArray(), b.getQualifierOffset(), b.getQualifierLength()); + c = compareQualifier(a, b); if (c != 0) return c; //timestamp: later sorts first - c = Longs.compare(b.getTimestamp(), a.getTimestamp()); + c = compareTimestamp(a, b); if (c != 0) return c; //type - c = (0xff & b.getTypeByte()) - (0xff & a.getTypeByte()); + c = compareType(a, b); if (c != 0) return c; if (onlyKey) return c; @@ -98,6 +92,31 @@ public class CellComparator implements Comparator, Serializable{ return Longs.compare(b.getMvccVersion(), a.getMvccVersion()); } + public static int compareRow(Cell a, Cell b) { + return Bytes.compareTo( + a.getRowArray(), a.getRowOffset(), a.getRowLength(), + b.getRowArray(), b.getRowOffset(), b.getRowLength()); + } + + public static int compareTimestamp(Cell a, Cell b) { + return Longs.compare(b.getTimestamp(), a.getTimestamp()); + } + + public static int compareType(Cell a, Cell b) { + return (0xff & b.getTypeByte()) - (0xff & a.getTypeByte()); + } + + public static int compareQualifier(Cell a, Cell b) { + return Bytes.compareTo( + a.getQualifierArray(), a.getQualifierOffset(), a.getQualifierLength(), + b.getQualifierArray(), b.getQualifierOffset(), b.getQualifierLength()); + } + + public static int compareFamily(Cell a, Cell b) { + return Bytes.compareTo( + a.getFamilyArray(), a.getFamilyOffset(), a.getFamilyLength(), + b.getFamilyArray(), b.getFamilyOffset(), b.getFamilyLength()); + } /**************** equals ****************************/ @@ -184,29 +203,23 @@ public class CellComparator implements Comparator, Serializable{ */ private static int compareStaticIgnoreMvccVersion(Cell a, Cell b) { //row - int c = Bytes.compareTo( - a.getRowArray(), a.getRowOffset(), a.getRowLength(), - b.getRowArray(), b.getRowOffset(), b.getRowLength()); + int c = compareRow(a, b); if (c != 0) return c; //family - c = Bytes.compareTo( - a.getFamilyArray(), a.getFamilyOffset(), a.getFamilyLength(), - b.getFamilyArray(), b.getFamilyOffset(), b.getFamilyLength()); + c = compareFamily(a, b); if (c != 0) return c; //qualifier - c = Bytes.compareTo( - a.getQualifierArray(), a.getQualifierOffset(), a.getQualifierLength(), - b.getQualifierArray(), b.getQualifierOffset(), b.getQualifierLength()); + c = compareQualifier(a, b); if (c != 0) return c; //timestamp: later sorts first - c = Longs.compare(b.getTimestamp(), a.getTimestamp()); + c = compareTimestamp(a, b); if (c != 0) return c; //type - c = (0xff & b.getTypeByte()) - (0xff & a.getTypeByte()); + c = compareType(a, b); return c; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/CompareTables.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/CompareTables.java new file mode 100644 index 0000000..a301d15 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/CompareTables.java @@ -0,0 +1,449 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.replication; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; + +import java.io.IOException; + +/** + * This map-only job compares the data from a local table with a remote one. + * Every cell is compared and must have exactly the same keys (timestampcomparison optional) + * as well as same value. It is possible to restrict the job by time range and + * families, as well as timestamp comparison. You can specify the zk quorum host and table name + * for the respective tables you wish to compare + *

+ * Counters are provided, Verifier.Counters.GOODROWS and ROWSWITHDIFFS. The reason + * for a why a row is different is shown in the map's log. + */ +public class CompareTables extends Configured implements Tool { + + private static final Log LOG = + LogFactory.getLog(CompareTables.class); + + public final static String NAME = "compareTables"; + static long startTime = 0; + static long endTime = 0; + static String tableName1 = null; + static String tableName2 = null; + static String families = null; + static String zkCluster1 = null; + static String zkCluster2 = null; + static boolean ignoreTimestamps = false; + + /** + * Map-only comparator for 2 tables + */ + public static class Verifier + extends TableMapper { + + public static enum Counters {GOODROWS, SOURCEMISSINGROWS, + TARGETMISSINGROWS, SOURCEMISSINGKEYS, TARGETMISSINGKEYS, DIFFERENTVALUES, ROWSWITHDIFFS} + + private ResultScanner targetScanner; + private Result currentSourceRow; + private Result currentTargetRow; + private boolean ignoreTimestamps; + private boolean advanceSourceScanner; + private boolean advanceTargetScanner; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + final Scan scan = new Scan(); + TableSplit split = (TableSplit) context.getInputSplit(); + scan.setStartRow(split.getStartRow()); + scan.setStopRow(split.getEndRow()); + scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); + long startTime = conf.getLong(NAME + ".startTime", 0); + long endTime = conf.getLong(NAME + ".endTime", 0); + String families = conf.get(NAME + ".families", null); + if (families != null) { + String[] fams = families.split(","); + for (String fam : fams) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + if (startTime != 0) { + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + ignoreTimestamps = conf.getBoolean(NAME + ".ignoreTimestamps", false); + + Configuration otherConf = new Configuration(conf); + String zkCluster2 = conf.get(NAME+".zkCluster2"); + if(zkCluster2 != null) { + ZKUtil.applyClusterKeyToConf(otherConf, zkCluster2); + } + + String table2 = conf.get(NAME+".tableName2"); + HTable otherTable = new HTable(otherConf, table2); + targetScanner = otherTable.getScanner(scan); + advanceTargetScanner = true; + } + + /** + * Map method that compares every scanned row with the equivalent from a + * distant cluster. + * + * @param row + * The current table row key. + * @param value + * The columns. + * @param context + * The current context. + * @throws IOException + * When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, final Result value, Context context) + throws IOException { + currentSourceRow = value; + do { + if (advanceTargetScanner) { + currentTargetRow = targetScanner.next(); + } + compareCurrentRows(context); + } while (advanceTargetScanner && !advanceSourceScanner); + } + + public void compareCurrentRows(Context context) { + if (currentSourceRow == null) { + sourceMissingRow(context); + return; + } + if (currentTargetRow == null) { + targetMissingRow(context); + return; + } + // rows are the same, compare the KVs then advance both scanners + advanceSourceScanner = true; + advanceTargetScanner = true; + + KeyValue[] sourceKVs = currentSourceRow.raw(); + KeyValue[] targetKVs = currentTargetRow.raw(); + KeyValue sourceKV = sourceKVs[0]; + KeyValue targetKV = targetKVs[0]; + int result = KeyValue.COMPARATOR.compareRows(sourceKV, targetKV); + if (result < 0) { + targetMissingRow(context); + return; + } else if (result > 0) { + sourceMissingRow(context); + return; + } + + boolean rowHasDiffs = false; + int sourceKVIndex = 0; + int targetKVIndex = 0; + while (true) { + sourceKV = sourceKVIndex < sourceKVs.length ? sourceKVs[sourceKVIndex] : null; + targetKV = targetKVIndex < targetKVs.length ? targetKVs[targetKVIndex] : null; + if (sourceKV == null && targetKV == null) { + break; // exhausted both arrays, terminate loop + } + int keyResult = compareKeys(sourceKV, targetKV); + if (keyResult < 0) { + LOG.warn("Target missing key: " + sourceKV); + context.getCounter(Counters.TARGETMISSINGKEYS).increment(1); + rowHasDiffs = true; + sourceKVIndex++; + } else if (keyResult > 0) { + LOG.warn("Source missing key: " + targetKV); + context.getCounter(Counters.SOURCEMISSINGKEYS).increment(1); + rowHasDiffs = true; + targetKVIndex++; + } else { + // same cells - compare values + if (!CellUtil.matchingValue(sourceKV, targetKV)) { + rowHasDiffs = true; + LOG.warn("Different values: "); + LOG.warn(" source key: " + sourceKV + " value: " + Bytes + .toStringBinary(sourceKV.getValueArray(), + sourceKV.getValueOffset(), sourceKV.getValueLength())); + LOG.warn(" target key: " + targetKV + " value: " + Bytes + .toStringBinary(targetKV.getValueArray(), + targetKV.getValueOffset(), targetKV.getValueLength())); + context.getCounter(Counters.DIFFERENTVALUES).increment(1); + } + sourceKVIndex++; + targetKVIndex++; + } + } + if (rowHasDiffs) { + context.getCounter(Counters.ROWSWITHDIFFS).increment(1); + } else { + context.getCounter(Counters.GOODROWS).increment(1); + } + } + + private int compareKeys(KeyValue kv1, KeyValue kv2) { + if (kv1 == null) { + return 1; + } + if (kv2 == null) { + return -1; + } + int result = CellComparator.compareRow(kv1, kv2); + if (result != 0) { + return result; + } + + result = CellComparator.compareFamily(kv1, kv2); + if (result != 0) { + return result; + } + + result = CellComparator.compareQualifier(kv1, kv2); + if (result != 0) { + return result; + } + if (!ignoreTimestamps) { + result = CellComparator.compareTimestamp(kv1, kv2); + if (result != 0) { + return result; + } + } + return CellComparator.compareType(kv1, kv2); + } + + private void targetMissingRow(Context context) { + LOG.warn("Target missing row: " + currentSourceRow); + context.getCounter(Counters.TARGETMISSINGROWS).increment(1); + context.getCounter(Counters.ROWSWITHDIFFS).increment(1); + advanceSourceScanner = true; + advanceTargetScanner = false; + } + + private void sourceMissingRow(Context context) { + LOG.warn("Source missing row: " + currentTargetRow); + context.getCounter(Counters.SOURCEMISSINGROWS).increment(1); + context.getCounter(Counters.ROWSWITHDIFFS).increment(1); + advanceSourceScanner = false; + advanceTargetScanner = true; + } + + protected void cleanup(Context context) throws IOException { + currentTargetRow = targetScanner.next(); + while (currentTargetRow != null) { + sourceMissingRow(context); + currentTargetRow = targetScanner.next(); + } + + if (targetScanner != null) { + targetScanner.close(); + targetScanner = null; + } + } + } + + /** + * Sets up the actual job. + * + * @param conf + * The current configuration. + * @param args + * The command line parameters. + * @return The newly created job. + * @throws java.io.IOException + * When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + if (!doCommandLine(args)) { + return null; + } + + conf.set(NAME+".tableName1", tableName1); + conf.set(NAME+".tableName2", tableName2); + conf.setLong(NAME+".startTime", startTime); + conf.setLong(NAME+".endTime", endTime); + conf.setBoolean(NAME+".ignoreTimestamps", ignoreTimestamps); + if (families != null) { + conf.set(NAME+".families", families); + } + + if(zkCluster1 != null) { + conf.set(NAME+".zkCluster1", zkCluster1); + ZKUtil.applyClusterKeyToConf(conf, zkCluster1); + } + if(zkCluster2 != null) { + conf.set(NAME+".zkCluster2", zkCluster2); + } + + Job job = new Job(conf, NAME + "_" + tableName1 + " vs " + tableName2); + job.setJarByClass(CompareTables.class); + + Scan scan = new Scan(); + if (startTime != 0) { + scan.setTimeRange(startTime, + endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + if(families != null) { + String[] fams = families.split(","); + for(String fam : fams) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + + TableMapReduceUtil.initTableMapperJob(tableName1, scan, + Verifier.class, null, null, job); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + return job; + } + + private static boolean doCommandLine(final String[] args) { + if (args.length < 2) { + printUsage(null); + return false; + } + try { + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + families = cmd.substring(familiesArgKey.length()); + continue; + } + + final String ignoreTimestampsKey = "--ignoretimestamps"; + if (cmd.startsWith(ignoreTimestampsKey)) { + ignoreTimestamps = true; + continue; + } + + final String zkCluster1ArgKey = "--zkcluster1="; + if (cmd.startsWith(zkCluster1ArgKey)) { + zkCluster1 = cmd.substring(zkCluster1ArgKey.length()); + continue; + } + final String zkCluster2ArgKey = "--zkcluster2="; + if (cmd.startsWith(zkCluster2ArgKey)) { + zkCluster2 = cmd.substring(zkCluster2ArgKey.length()); + continue; + } + + if (i == args.length-2) { + tableName1 = cmd; + } + + if (i == args.length-1) { + tableName2 = cmd; + } + } + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + + /* + * @param errorMsg Error message. Can be null. + */ + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: compareTables [--starttime=X]" + + " [--endtime=Y] [--families=A] [--ignoretimestamps] [--zkcluster2=B] "); + System.err.println(); + System.err.println("Options:"); + System.err.println(" starttime beginning of the time range"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range"); + System.err.println(" families comma-separated list of families to compare"); + System.err.println(" ignoretimestamps ignore cell timestamps for comparisons"); + System.err.println(" zkcluster1 " + + "ZK cluster key of the 1st table if it is remote e.g. localhost:2181:/hbase"); + System.err.println(" zkcluster2 " + + "ZK cluster key of the 2nd table if different from 1st table e.g. localhost:2181:/hbase"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" tablename1 Name of the local table to compare"); + System.err.println(" tablename2 Name of the second table to compare"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To verify the data consistency in a particular time-window for " + + "2 tables on same cluster"); + System.err.println(" $ bin/hbase " + + "org.apache.hadoop.hbase.mapreduce.replication.CompareTables" + + " --starttime=1265875194289 --stoptime=1265878794289 "); + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = this.getConf(); + Job job = createSubmittableJob(conf, args); + if (job != null) { + return job.waitForCompletion(true) ? 0 : 1; + } + return 1; + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + Job job = createSubmittableJob(conf, args); + if (job != null) { + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + } +} -- 2.1.0