From f9e53b7530768e7696add72d058deb9d58d4ec17 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 25 Jun 2018 10:39:10 -0500 Subject: [PATCH] HBASE-14112 VerifyReplication may fail when source region is empty (Jianwei Cui) VerifyReplication job may fail to detect errors when the regions of the source cluster are empty but the destination cluster have rows. --- .../mapreduce/replication/VerifyReplication.java | 154 +++++++++++---------- .../hbase/replication/TestVerifyReplication.java | 13 +- 2 files changed, 94 insertions(+), 73 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 85eebc50bd..4597e95a3e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -151,77 +153,7 @@ public class VerifyReplication extends Configured implements Tool { Context context) throws IOException { if (replicatedScanner == null) { - Configuration conf = context.getConfiguration(); - sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); - delimiter = conf.get(NAME + ".delimiter", ""); - verbose = conf.getBoolean(NAME +".verbose", false); - batch = conf.getInt(NAME + ".batch", -1); - final Scan scan = new Scan(); - if (batch > 0) { - scan.setBatch(batch); - } - scan.setCacheBlocks(false); - scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); - long startTime = conf.getLong(NAME + ".startTime", 0); - long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); - String families = conf.get(NAME + ".families", null); - if(families != null) { - String[] fams = families.split(","); - for(String fam : fams) { - scan.addFamily(Bytes.toBytes(fam)); - } - } - boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); - scan.setRaw(includeDeletedCells); - String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); - setRowPrefixFilter(scan, rowPrefixes); - scan.setTimeRange(startTime, endTime); - int versions = conf.getInt(NAME+".versions", -1); - LOG.info("Setting number of version inside map as: " + versions); - if (versions >= 0) { - scan.setMaxVersions(versions); - } - TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); - sourceConnection = ConnectionFactory.createConnection(conf); - sourceTable = sourceConnection.getTable(tableName); - - final InputSplit tableSplit = context.getInputSplit(); - - String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); - Configuration peerConf = HBaseConfiguration.createClusterConf(conf, - zkClusterKey, PEER_CONFIG_PREFIX); - - replicatedConnection = ConnectionFactory.createConnection(peerConf); - replicatedTable = replicatedConnection.getTable(tableName); - scan.setStartRow(value.getRow()); - - byte[] endRow = null; - if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { - endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() - .getEndKey(); - } else { - endRow = ((TableSplit) tableSplit).getEndRow(); - } - - scan.setStopRow(endRow); - - String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); - if (peerSnapshotName != null) { - String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); - String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); - String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); - FileSystem.setDefaultUri(peerConf, peerFSAddress); - FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); - LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" - + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf) - + " peerFSAddress:" + peerFSAddress); - - replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf), - new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); - } else { - replicatedScanner = replicatedTable.getScanner(scan); - } - currentCompareRowInPeerTable = replicatedScanner.next(); + openScannerForReplicatedTable(context); } while (true) { if (currentCompareRowInPeerTable == null) { @@ -257,6 +189,79 @@ public class VerifyReplication extends Configured implements Tool { } } + private void openScannerForReplicatedTable(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); + delimiter = conf.get(NAME + ".delimiter", ""); + verbose = conf.getBoolean(NAME +".verbose", false); + batch = conf.getInt(NAME + ".batch", -1); + final Scan scan = new Scan(); + if (batch > 0) { + scan.setBatch(batch); + } + scan.setCacheBlocks(false); + scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); + long startTime = conf.getLong(NAME + ".startTime", 0); + long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); + String families = conf.get(NAME + ".families", null); + if(families != null) { + for(String fam : families.split(",")) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); + scan.setRaw(includeDeletedCells); + String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); + setRowPrefixFilter(scan, rowPrefixes); + scan.setTimeRange(startTime, endTime); + int versions = conf.getInt(NAME+".versions", -1); + LOG.info("Setting number of version inside map as {}", versions); + if (versions >= 0) { + scan.readVersions(versions); + } + TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); + sourceConnection = ConnectionFactory.createConnection(conf); + sourceTable = sourceConnection.getTable(tableName); + + String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, + zkClusterKey, PEER_CONFIG_PREFIX); + + replicatedConnection = ConnectionFactory.createConnection(peerConf); + replicatedTable = replicatedConnection.getTable(tableName); + + final InputSplit tableSplit = context.getInputSplit(); + byte[] startRow, endRow; + if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { + RegionInfo region = ((TableSnapshotInputFormat.TableSnapshotRegionSplit)tableSplit) + .getRegion(); + startRow = region.getStartKey(); + endRow = region.getEndKey(); + } else { + TableSplit split = (TableSplit) tableSplit; + startRow = split.getStartRow(); + endRow = split.getEndRow(); + } + scan.withStartRow(startRow).withStopRow(endRow); + + String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); + if (peerSnapshotName != null) { + String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); + String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); + String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); + FileSystem.setDefaultUri(peerConf, peerFSAddress); + FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); + LOG.info("Using peer snapshot:{} with temp dir:{} peer root uri:{} peerFSAddress:{}", + peerSnapshotName, peerSnapshotTmpDir, FSUtils.getRootDir(peerConf), peerFSAddress); + + replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf), + new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); + } else { + replicatedScanner = replicatedTable.getScanner(scan); + } + currentCompareRowInPeerTable = replicatedScanner.next(); + } + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { if (sleepMsBeforeReCompare > 0) { Threads.sleep(sleepMsBeforeReCompare); @@ -285,6 +290,13 @@ public class VerifyReplication extends Configured implements Tool { @Override protected void cleanup(Context context) { + if (replicatedScanner == null) { + try { + openScannerForReplicatedTable(context); + } catch (IOException e) { + LOG.error("fail to open scanner for replicated table", e); + } + } if (replicatedScanner != null) { try { while (currentCompareRowInPeerTable != null) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index e1fda4eb0b..4bc88e9ded 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -126,9 +126,18 @@ public class TestVerifyReplication extends TestReplicationBase { Bytes.toBytes("diff data")); htable2.put(put); } - Delete delete = new Delete(put.getRow()); - htable2.delete(delete); + htable2.delete(new Delete(put.getRow())); runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + + // delete all rows from source table + for (Result result : htable1.getScanner(new Scan())) { + htable1.delete(new Delete(result.getRow())); + } + put = new Put(Bytes.toBytes(System.currentTimeMillis())); + put.addColumn(famName, row, row); + htable2.put(put); + + runVerifyReplication(args, 0, 1); } /** -- 2.16.1