From 20fe2c3596f3b7894846983fec51e324e87bddc5 Mon Sep 17 00:00:00 2001 From: Geoffrey Date: Tue, 17 May 2016 11:09:56 -0700 Subject: [PATCH] HBASE-15847 - Added prefix filtering to VerifyReplication --- .../mapreduce/replication/VerifyReplication.java | 60 ++++++++++- .../replication/TestReplicationSmallTests.java | 115 +++++++++++---------- 2 files changed, 117 insertions(+), 58 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index a452036..655c71a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +36,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; @@ -77,6 +81,7 @@ public class VerifyReplication extends Configured implements Tool { static String tableName = null; static String families = null; static String peerId = null; + static String rowPrefixes = null; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; @@ -123,6 +128,8 @@ public class VerifyReplication extends Configured implements Tool { scan.addFamily(Bytes.toBytes(fam)); } } + String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); + setRowPrefixFilter(scan, rowPrefixes); scan.setTimeRange(startTime, endTime); int versions = conf.getInt(NAME+".versions", -1); LOG.info("Setting number of version inside map as: " + versions); @@ -271,6 +278,9 @@ public class VerifyReplication extends Configured implements Tool { if (families != null) { conf.set(NAME+".families", families); } + if (rowPrefixes != null){ + conf.set(NAME+".rowPrefixes", rowPrefixes); + } Pair peerConfigPair = getPeerQuorumConfig(conf); ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); @@ -299,6 +309,9 @@ public class VerifyReplication extends Configured implements Tool { scan.addFamily(Bytes.toBytes(fam)); } } + + setRowPrefixFilter(scan, rowPrefixes); + TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); @@ -311,11 +324,38 @@ public class VerifyReplication extends Configured implements Tool { return job; } + private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { + if (rowPrefixes != null && !rowPrefixes.isEmpty()) { + String[] rowPrefixArray = rowPrefixes.split(","); + Arrays.sort(rowPrefixArray); + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); + for (String prefix : rowPrefixArray) { + Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); + filterList.addFilter(filter); + } + scan.setFilter(filterList); + byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); + byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); + setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); + } + } + + private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { + scan.setStartRow(startPrefixRow); + byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), + new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); + scan.setStopRow(stopRow); + } + private static boolean doCommandLine(final String[] args) { if (args.length < 2) { printUsage(null); return false; } + //in case we've been run before, restore all parameters to their initial states + //Otherwise, if our previous run included a parameter not in args this time, + //we might hold on to the old value. + restoreDefaults(); try { for (int i = 0; i < args.length; i++) { String cmd = args[i]; @@ -354,6 +394,12 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String rowPrefixesKey = "--row-prefixes="; + if (cmd.startsWith(rowPrefixesKey)){ + rowPrefixes = cmd.substring(rowPrefixesKey.length()); + continue; + } + if (i == args.length-2) { peerId = cmd; } @@ -370,6 +416,17 @@ public class VerifyReplication extends Configured implements Tool { return true; } + private static void restoreDefaults() { + startTime = 0; + endTime = Long.MAX_VALUE; + batch = Integer.MAX_VALUE; + versions = -1; + tableName = null; + families = null; + peerId = null; + rowPrefixes = null; + } + /* * @param errorMsg Error message. Can be null. */ @@ -378,7 +435,7 @@ public class VerifyReplication extends Configured implements Tool { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: verifyrep [--starttime=X]" + - " [--stoptime=Y] [--families=A] "); + " [--stoptime=Y] [--families=A] [--row-prefixes=B] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -386,6 +443,7 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" endtime end of the time range"); System.err.println(" versions number of cell versions to verify"); System.err.println(" families comma-separated list of families to copy"); + System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 2a20a4f..8efa67e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,6 +32,7 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; @@ -72,6 +74,7 @@ import org.junit.experimental.categories.Category; public class TestReplicationSmallTests extends TestReplicationBase { private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class); + private static final String PEER_ID = "2"; /** * @throws java.lang.Exception @@ -84,6 +87,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { utility1.getHBaseCluster().getRegionServerThreads()) { utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName()); } + int rowCount = utility1.countRows(tableName); utility1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on @@ -97,7 +101,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { fail("Waited too much time for truncate"); } ResultScanner scanner = htable2.getScanner(scan); - Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + Result[] res = scanner.next(rowCount); scanner.close(); if (res.length != 0) { if (res.length < lastCount) { @@ -254,13 +258,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testSmallBatch() throws Exception { LOG.info("testSmallBatch"); // normal Batch tests - List puts = new ArrayList<>(); - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - Put put = new Put(Bytes.toBytes(i)); - put.addColumn(famName, row, row); - puts.add(put); - } - htable1.put(puts); + loadData("", row); Scan scan = new Scan(); @@ -269,15 +267,20 @@ public class TestReplicationSmallTests extends TestReplicationBase { scanner1.close(); assertEquals(NB_ROWS_IN_BATCH, res1.length); - for (int i = 0; i < NB_RETRIES; i++) { + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } + + private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException { + Scan scan; + for (int i = 0; i < retries; i++) { scan = new Scan(); - if (i==NB_RETRIES-1) { + if (i== retries -1) { fail("Waited too much time for normal batch replication"); } ResultScanner scanner = htable2.getScanner(scan); - Result[] res = scanner.next(NB_ROWS_IN_BATCH); + Result[] res = scanner.next(expectedRows); scanner.close(); - if (res.length != NB_ROWS_IN_BATCH) { + if (res.length != expectedRows) { LOG.info("Only got " + res.length + " rows"); Thread.sleep(SLEEP_TIME); } else { @@ -286,6 +289,16 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } + private void loadData(String prefix, byte[] row) throws IOException { + List puts = new ArrayList<>(); + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); + put.addColumn(famName, row, row); + puts.add(put); + } + htable1.put(puts); + } + /** * Test disable/enable replication, trying to insert, make sure nothing's * replicated, enable it, the insert should be replicated @@ -296,7 +309,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testDisableEnable() throws Exception { // Test disabling replication - admin.disablePeer("2"); + admin.disablePeer(PEER_ID); byte[] rowkey = Bytes.toBytes("disable enable"); Put put = new Put(rowkey); @@ -315,7 +328,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } // Test enable replication - admin.enablePeer("2"); + admin.enablePeer(PEER_ID); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); @@ -339,7 +352,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { @Test(timeout=300000) public void testAddAndRemoveClusters() throws Exception { LOG.info("testAddAndRemoveClusters"); - admin.removePeer("2"); + admin.removePeer(PEER_ID); Thread.sleep(SLEEP_TIME); byte[] rowKey = Bytes.toBytes("Won't be replicated"); Put put = new Put(rowKey); @@ -361,7 +374,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); - admin.addPeer("2", rpc, null); + admin.addPeer(PEER_ID, rpc, null); Thread.sleep(SLEEP_TIME); rowKey = Bytes.toBytes("do rep"); put = new Put(rowKey); @@ -459,18 +472,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { // identical since it does the check testSmallBatch(); - String[] args = new String[] {"2", tableName.getNameAsString()}; - Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(0, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + String[] args = new String[] {PEER_ID, tableName.getNameAsString()}; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); Scan scan = new Scan(); ResultScanner rs = htable2.getScanner(scan); @@ -484,16 +487,21 @@ public class TestReplicationSmallTests extends TestReplicationBase { } Delete delete = new Delete(put.getRow()); htable2.delete(delete); - job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + } + + private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args); if (job == null) { fail("Job wasn't created, see the log"); } if (!job.waitForCompletion(true)) { fail("Job failed, see the log"); } - assertEquals(0, job.getCounters(). + assertEquals(expectedGoodRows, job.getCounters(). findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). + assertEquals(expectedBadRows, job.getCounters(). findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); } @@ -556,18 +564,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertEquals(1, res1.length); assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); - String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; - Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(0, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(1, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()}; + runVerifyReplication(args, 0, 1); } @Test(timeout=300000) @@ -618,7 +616,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { try { // Disabling replication and modifying the particular version of the cell to validate the feature. - admin.disablePeer("2"); + admin.disablePeer(PEER_ID); Put put2 = new Put(Bytes.toBytes("r1")); put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99")); htable2.put(put2); @@ -631,21 +629,11 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertEquals(1, res1.length); assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); - String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; - Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(0, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(1, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()}; + runVerifyReplication(args, 0, 1); } finally { - admin.enablePeer("2"); + admin.enablePeer(PEER_ID); } } @@ -803,5 +791,18 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } + @Test(timeout=300000) + public void testVerifyReplicationPrefixFiltering() throws Exception { + final byte[] prefixRow = Bytes.toBytes("prefixrow"); + final byte[] prefixRow2 = Bytes.toBytes("secondrow"); + loadData("prefixrow", prefixRow); + loadData("secondrow", prefixRow2); + loadData("aaa", row); + loadData("zzz", row); + waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); + String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID, + tableName.getNameAsString()}; + runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0); + } } -- 2.5.4 (Apple Git-61)