From 2d57de330ef55801dd7b56099c508a107f01316e Mon Sep 17 00:00:00 2001 From: chenheng Date: Thu, 3 Dec 2015 10:11:09 +0800 Subject: [PATCH] HBASE-14905 VerifyReplication does not honour versions option --- .../mapreduce/replication/VerifyReplication.java | 6 ++ .../hbase/replication/TestReplicationBase.java | 2 +- .../replication/TestReplicationSmallTests.java | 74 ++++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 28f9f39..e00c682 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -121,6 +121,8 @@ public class VerifyReplication extends Configured implements Tool { } } scan.setTimeRange(startTime, endTime); + int versions = conf.getInt(NAME+".versions", -1); + LOG.info("Setting number of version inside map as: " + versions); if (versions >= 0) { scan.setMaxVersions(versions); } @@ -269,6 +271,9 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); LOG.info("Peer Quorum Address: " + peerQuorumAddress); + conf.setInt(NAME + ".versions", versions); + LOG.info("Number of version: " + versions); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); @@ -276,6 +281,7 @@ public class VerifyReplication extends Configured implements Tool { scan.setTimeRange(startTime, endTime); if (versions >= 0) { scan.setMaxVersions(versions); + LOG.info("Number of versions set to " + versions); } if(families != null) { String[] fams = families.split(","); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index ac87269..e52a600 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -135,7 +135,7 @@ public class TestReplicationBase { HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); - fam.setMaxVersions(3); + fam.setMaxVersions(100); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index b050f49..0ee15c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -490,6 +490,80 @@ public class TestReplicationSmallTests extends TestReplicationBase { findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); } + @Test(timeout=300000) + // VerifyReplication should honor versions option + public void testHBase14905() throws Exception { + // normal Batch tests + byte[] qualifierName = Bytes.toBytes("f1"); + Put put = new Put(Bytes.toBytes("r1")); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1002")); + htable1.put(put); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1001")); + htable1.put(put); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1112")); + htable1.put(put); + + Scan scan = new Scan(); + scan.setMaxVersions(100); + ResultScanner scanner1 = htable1.getScanner(scan); + Result[] res1 = scanner1.next(1); + scanner1.close(); + + assertEquals(1, res1.length); + assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); + + for (int i = 0; i < NB_RETRIES; i++) { + scan = new Scan(); + scan.setMaxVersions(100); + scanner1 = htable2.getScanner(scan); + res1 = scanner1.next(1); + scanner1.close(); + if (res1.length != 1) { + LOG.info("Only got " + res1.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); + if (cellNumber != 3) { + LOG.info("Only got " + cellNumber + " cells"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + if (i == NB_RETRIES-1) { + fail("Waited too much time for normal batch replication"); + } + } + + put.addColumn(famName, qualifierName, Bytes.toBytes("v1111")); + htable2.put(put); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1112")); + htable2.put(put); + + scan = new Scan(); + scan.setMaxVersions(100); + scanner1 = htable2.getScanner(scan); + res1 = scanner1.next(NB_ROWS_IN_BATCH); + scanner1.close(); + + assertEquals(1, res1.length); + assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); + + + String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; + Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + } + /** * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out * the compaction WALEdit -- 1.9.3 (Apple Git-50)