diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index b7d540b..f9f61fd 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -74,10 +74,13 @@ public class VerifyReplication { public static class Verifier extends TableMapper { - public static enum Counters {GOODROWS, BADROWS} + // BADROWS = ONLY_IN_SOURCE_TABLE_ROWS + ONLY_IN_PEER_TABLE_ROWS + CONTENT_DIFFERENT_ROWS + public static enum Counters { + GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} private ResultScanner replicatedScanner; - + private Result currentCompareRowInPeerTable; + /** * Map method that compares every scanned row with the equivalent from * a distant cluster. @@ -124,21 +127,58 @@ public class VerifyReplication { return null; } }); + currentCompareRowInPeerTable = replicatedScanner.next(); } - Result res = replicatedScanner.next(); - try { - Result.compareResults(value, res); - context.getCounter(Counters.GOODROWS).increment(1); - } catch (Exception e) { - LOG.warn("Bad row", e); - context.getCounter(Counters.BADROWS).increment(1); + while (true) { + if (currentCompareRowInPeerTable == null) { + // reach the end of peer table region, row only in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } + int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); + if (rowCmpRet == 0) { + // rowkey is same, we need to compare the columns of the row + try { + Result.compareResults(value, currentCompareRowInPeerTable); + context.getCounter(Counters.GOODROWS).increment(1); + } catch (Exception e) { + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); + } + currentCompareRowInPeerTable = replicatedScanner.next(); + break; + } else if (rowCmpRet < 0) { + // row only exists in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } else { + // row only exists in peer table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } } } + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + context.getCounter(counter).increment(1); + context.getCounter(Counters.BADROWS).increment(1); + LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow())); + } + protected void cleanup(Context context) { if (replicatedScanner != null) { - replicatedScanner.close(); - replicatedScanner = null; + try { + while (currentCompareRowInPeerTable != null) { + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } + } catch (Exception e) { + LOG.error("fail to scan peer table in cleanup", e); + } finally { + replicatedScanner.close(); + replicatedScanner = null; + } } } }