From c61b0e88d7721dc8e9af2879c4e468c13d86fce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E4=B8=96=E5=BD=AC10204932?= Date: Sat, 1 Jul 2017 10:41:02 +0800 Subject: [PATCH] HBASE-17822 Set maxBadRows and outputDirectory option for VerifyReplication --- .../mapreduce/replication/VerifyReplication.java | 83 +++++++++++++++++++--- 1 file changed, 72 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 3d32edd..b75c18d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -58,9 +58,12 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -108,6 +111,9 @@ public class VerifyReplication extends Configured implements Tool { String peerFSAddress = null; //Peer cluster HBase root dir location String peerHBaseRootAddress = null; + int maxBadRows = 100000; + //output result to hdfs or mr log + String outputDir = null; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; @@ -116,7 +122,7 @@ public class VerifyReplication extends Configured implements Tool { * Map-only comparator for 2 tables */ public static class Verifier - extends TableMapper { + extends TableMapper { @@ -133,6 +139,8 @@ public class VerifyReplication extends Configured implements Tool { private String delimiter = ""; private boolean verbose = false; private int batch = -1; + private Text keyText = new Text(); + private Text valueText = new Text(); /** * Map method that compares every scanned row with the equivalent from @@ -145,7 +153,10 @@ public class VerifyReplication extends Configured implements Tool { @Override public void map(ImmutableBytesWritable row, final Result value, Context context) - throws IOException { + throws IOException, InterruptedException { + if(context.getCounter(Counters.BADROWS).getValue() >= context.getConfiguration().getInt(NAME+".maxBadRows",100000)){ + return; + } if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); @@ -245,6 +256,9 @@ public class VerifyReplication extends Configured implements Tool { logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { + if(context.getCounter(Counters.BADROWS).getValue() >= context.getConfiguration().getInt(NAME+".maxBadRows",100000)){ + return; + } // row only exists in peer table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); @@ -253,7 +267,7 @@ public class VerifyReplication extends Configured implements Tool { } } - private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) throws IOException, InterruptedException { if (sleepMsBeforeReCompare > 0) { Threads.sleep(sleepMsBeforeReCompare); try { @@ -275,8 +289,13 @@ public class VerifyReplication extends Configured implements Tool { } context.getCounter(counter).increment(1); context.getCounter(Counters.BADROWS).increment(1); - LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + - delimiter); + if( null != context.getConfiguration().get(NAME+".outputDir",null)){ + keyText.set(counter.toString()); + valueText.set(", rowkey="+Bytes.toString(row.getRow())); + context.write(keyText,valueText); + }else { + LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + delimiter); + } } @Override @@ -284,6 +303,9 @@ public class VerifyReplication extends Configured implements Tool { if (replicatedScanner != null) { try { while (currentCompareRowInPeerTable != null) { + if(context.getCounter(Counters.BADROWS).getValue() >= context.getConfiguration().getInt(NAME+".maxBadRows",100000)){ + return; + } logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); @@ -328,6 +350,16 @@ public class VerifyReplication extends Configured implements Tool { } } + public static class VerifierReducer + extends Reducer { + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + for(Text value : values){ + context.write(key,value); + } + } + } + private static Pair getPeerQuorumConfig( final Configuration conf, String peerId) throws IOException { ZooKeeperWatcher localZKW = null; @@ -383,12 +415,16 @@ public class VerifyReplication extends Configured implements Tool { conf.setInt(NAME + ".batch", batch); conf.setBoolean(NAME +".verbose", verbose); conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); + conf.setInt(NAME+".maxBadRows", maxBadRows); if (families != null) { conf.set(NAME+".families", families); } if (rowPrefixes != null){ conf.set(NAME+".rowPrefixes", rowPrefixes); } + if (null != outputDir) { + conf.set(NAME+".outputDir", outputDir); + } Pair peerConfigPair = getPeerQuorumConfig(conf, peerId); ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); @@ -435,22 +471,33 @@ public class VerifyReplication extends Configured implements Tool { } setRowPrefixFilter(scan, rowPrefixes); - + Class mapOutputKey = null; + Class mapOutputValue = null; + if(null != conf.get(NAME+".outputDir",null)) { + mapOutputKey = Text.class; + mapOutputValue = Text.class; + } if (sourceSnapshotName != null) { Path snapshotTempPath = new Path(sourceSnapshotTmpDir); LOG.info( "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); - TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, - null, job, true, snapshotTempPath); + TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, mapOutputKey, + mapOutputValue, job, true, snapshotTempPath); } else { - TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, mapOutputKey, mapOutputValue, job); } Configuration peerClusterConf = peerConfigPair.getSecond(); // Obtain the auth token from peer cluster TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); + if(null != conf.get(NAME+".outputDir",null)) { + job.setReducerClass(VerifierReducer.class); + FileOutputFormat.setOutputPath(job, new Path(conf.get(NAME + ".outputDir"))); + job.setNumReduceTasks(1); + }else{ + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + } return job; } @@ -509,6 +556,18 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String maxBadRowsArgKey = "--maxBadRows="; + if (cmd.startsWith(maxBadRowsArgKey)) { + maxBadRows = Integer.parseInt(cmd.substring(maxBadRowsArgKey.length())); + continue; + } + + final String outputDirArgKey = "--outputDir="; + if (cmd.startsWith(outputDirArgKey)) { + outputDir = cmd.substring(outputDirArgKey.length()); + continue; + } + final String versionsArgKey = "--versions="; if (cmd.startsWith(versionsArgKey)) { versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); @@ -665,6 +724,8 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); + System.err.println(" maxBadRows the max bad rows to print,default value is 100000"); + System.err.println(" outputDir output directory to get file which contain badrows "); System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); -- 1.9.1