From 249e4ee1e36014c439aef6c418abcaec4c11b1fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E4=B8=96=E5=BD=AC10204932?= Date: Thu, 23 Mar 2017 14:47:47 +0800 Subject: [PATCH] HBASE-17822 Set maxBadRows and outputDirectory option for VerifyReplication --- .../mapreduce/replication/VerifyReplication.java | 81 ++++++++++++++++++---- 1 file changed, 68 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index ba5966b..8e1c787 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; @@ -53,7 +54,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -87,6 +91,8 @@ public class VerifyReplication extends Configured implements Tool { static int sleepMsBeforeReCompare = 0; static boolean verbose = false; static boolean includeDeletedCells = false; + static int maxBadRows = 100000; + static String outputDir = null; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; @@ -94,7 +100,7 @@ public class VerifyReplication extends Configured implements Tool { * Map-only comparator for 2 tables */ public static class Verifier - extends TableMapper { + extends TableMapper { @@ -110,6 +116,8 @@ public class VerifyReplication extends Configured implements Tool { private int sleepMsBeforeReCompare; private String delimiter = ""; private boolean verbose = false; + private Text keyText = new Text(); + private Text valueText = new Text(); /** * Map method that compares every scanned row with the equivalent from @@ -122,7 +130,10 @@ public class VerifyReplication extends Configured implements Tool { @Override public void map(ImmutableBytesWritable row, final Result value, Context context) - throws IOException { + throws IOException, InterruptedException { + if(context.getCounter(Counters.BADROWS).getValue() >= context.getConfiguration().getInt(NAME+".maxBadRows",100000)){ + return; + } if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); @@ -169,6 +180,7 @@ public class VerifyReplication extends Configured implements Tool { currentCompareRowInPeerTable = replicatedScanner.next(); } while (true) { + if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); @@ -194,6 +206,9 @@ public class VerifyReplication extends Configured implements Tool { logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { + if(context.getCounter(Counters.BADROWS).getValue() >= context.getConfiguration().getInt(NAME+".maxBadRows",100000)){ + return; + } // row only exists in peer table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); @@ -202,7 +217,7 @@ public class VerifyReplication extends Configured implements Tool { } } - private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) throws IOException, InterruptedException { if (sleepMsBeforeReCompare > 0) { Threads.sleep(sleepMsBeforeReCompare); try { @@ -224,8 +239,15 @@ public class VerifyReplication extends Configured implements Tool { } context.getCounter(counter).increment(1); context.getCounter(Counters.BADROWS).increment(1); - LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + - delimiter); + if(null != context.getConfiguration().get(NAME+".outputDir")){ + keyText.set(counter.toString()); + valueText.set(", rowkey=" + delimiter + Bytes.toString(row.getRow())+ delimiter); + context.write(keyText,valueText); + }else { + LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + + delimiter); + } + } @Override @@ -233,6 +255,9 @@ public class VerifyReplication extends Configured implements Tool { if (replicatedScanner != null) { try { while (currentCompareRowInPeerTable != null) { + if(context.getCounter(Counters.BADROWS).getValue() >= context.getConfiguration().getInt(NAME+".maxBadRows",100000)){ + return; + } logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); @@ -277,6 +302,16 @@ public class VerifyReplication extends Configured implements Tool { } } + public static class VerifierReducer + extends Reducer { + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + for(Text value : values){ + context.write(key,value); + } + } + } + private static Pair getPeerQuorumConfig( final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; @@ -331,6 +366,8 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME + ".delimiter", delimiter); conf.setBoolean(NAME +".verbose", verbose); conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); + conf.setInt(NAME+".maxBadRows", maxBadRows); + conf.set(NAME+".outputDir", outputDir); if (families != null) { conf.set(NAME+".families", families); } @@ -368,16 +405,20 @@ public class VerifyReplication extends Configured implements Tool { } setRowPrefixFilter(scan, rowPrefixes); - - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Verifier.class, null, null, job); - + if(null != conf.get(NAME+".outputDir")) { + TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, Text.class, Text.class, job); + job.setReducerClass(VerifierReducer.class); + FileOutputFormat.setOutputPath(job, new Path(conf.get(NAME + ".outputDir"))); + job.setNumReduceTasks(1); + }else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Verifier.class, null, null, job); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + } Configuration peerClusterConf = peerConfigPair.getSecond(); // Obtain the auth token from peer cluster TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); - - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); return job; } @@ -433,6 +474,18 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String maxBadRowsArgKey = "--maxBadRows="; + if (cmd.startsWith(maxBadRowsArgKey)) { + maxBadRows = Integer.parseInt(cmd.substring(maxBadRowsArgKey.length())); + continue; + } + + final String outputDirArgKey = "--outputDir="; + if (cmd.startsWith(outputDirArgKey)) { + outputDir = cmd.substring(outputDirArgKey.length()); + continue; + } + final String includeDeletedCellsArgKey = "--raw"; if (cmd.equals(includeDeletedCellsArgKey)) { includeDeletedCells = true; @@ -521,7 +574,7 @@ public class VerifyReplication extends Configured implements Tool { } System.err.println("Usage: verifyrep [--starttime=X]" + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--verbose] "); + "[--maxBadRows=][--outputDir=][--verbose] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -534,6 +587,8 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" delimiter the delimiter used in display around rowkey"); System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); + System.err.println(" maxBadRows the max bad rows to print,default value is 100000"); + System.err.println(" outputDir output directory to get file which contain badrows "); System.err.println(" verbose logs row keys of good rows"); System.err.println(); System.err.println("Args:"); -- 1.9.1