From 0ce57372c36e2ce4a40d46445166cd1573e9a265 Mon Sep 17 00:00:00 2001 From: wellington Date: Wed, 28 Mar 2018 22:12:01 +0100 Subject: [PATCH] Adding option to skip deletes on target, performing only Puts for additional/mismatching cell values from source --- .../apache/hadoop/hbase/mapreduce/SyncTable.java | 17 +++- .../hadoop/hbase/mapreduce/TestSyncTable.java | 107 +++++++++++++++++++-- 2 files changed, 115 insertions(+), 9 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 9b4625b..6dd8e0f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -64,6 +64,7 @@ public class SyncTable extends Configured implements Tool { static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; + static final String INSERTS_ONLY_CONF_KEY="sync.table.inserts.only"; Path sourceHashDir; String sourceTableName; @@ -72,6 +73,7 @@ public class SyncTable extends Configured implements Tool { String sourceZkCluster; String targetZkCluster; boolean dryRun; + boolean insertsOnly; Counters counters; @@ -128,6 +130,7 @@ public class SyncTable extends Configured implements Tool { jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); } jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); + jobConf.setBoolean(INSERTS_ONLY_CONF_KEY, insertsOnly); TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class, null, null, job); @@ -162,6 +165,7 @@ public class SyncTable extends Configured implements Tool { Table sourceTable; Table targetTable; boolean dryRun; + boolean insertsOnly; HashTable.TableHash sourceTableHash; HashTable.TableHash.Reader sourceHashReader; @@ -186,7 +190,8 @@ public class SyncTable extends Configured implements Tool { TableOutputFormat.OUTPUT_CONF_PREFIX); sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); - dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); + dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); + insertsOnly = conf.getBoolean(INSERTS_ONLY_CONF_KEY, false); sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); LOG.info("Read source hash manifest: " + sourceTableHash); @@ -488,7 +493,7 @@ public class SyncTable extends Configured implements Tool { context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); matchingRow = false; - if (!dryRun) { + if (!dryRun && !insertsOnly) { if (delete == null) { delete = new Delete(rowKey); } @@ -696,6 +701,8 @@ public class SyncTable extends Configured implements Tool { System.err.println(" (defaults to cluster in classpath's config)"); System.err.println(" dryrun if true, output counters but no writes"); System.err.println(" (defaults to false)"); + System.err.println(" insertsonly if true, output Puts only, but no Deletes "); + System.err.println(" (defaults to false)"); System.err.println(); System.err.println("Args:"); System.err.println(" sourcehashdir path to HashTable output dir for source table"); @@ -747,6 +754,12 @@ public class SyncTable extends Configured implements Tool { continue; } + final String insertsOnlyKey = "--insertsonly="; + if (cmd.startsWith(insertsOnlyKey)) { + insertsOnly = Boolean.parseBoolean(cmd.substring(insertsOnlyKey.length())); + continue; + } + printUsage("Invalid argument '" + cmd + "'"); return false; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java index 543a169..972d09d 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -74,6 +74,7 @@ public class TestSyncTable { @AfterClass public static void afterClass() throws Exception { + TEST_UTIL.cleanupDataTestDirOnTestFS(); TEST_UTIL.shutdownMiniCluster(); } @@ -105,7 +106,30 @@ public class TestSyncTable { TEST_UTIL.deleteTable(sourceTableName); TEST_UTIL.deleteTable(targetTableName); - TEST_UTIL.cleanupDataTestDirOnTestFS(); + } + + @Test + public void testSyncTableInsertsOnly() throws Exception { + final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); + final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); + Path testDir = TEST_UTIL.getDataTestDirOnTestFS + ("testSyncTableInsertsOnly"); + + writeTestData(sourceTableName, targetTableName); + hashSourceTable(sourceTableName, testDir); + Counters syncCounters = syncTables(sourceTableName, targetTableName, + testDir, "--insertsonly=true"); + assertTargetInsertsOnly(100, sourceTableName, targetTableName); + + assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + TEST_UTIL.deleteTable(sourceTableName); + TEST_UTIL.deleteTable(targetTableName); } private void assertEqualTables(int expectedRows, TableName sourceTableName, @@ -184,14 +208,83 @@ public class TestSyncTable { targetTable.close(); } + private void assertTargetInsertsOnly(int expectedRows, TableName + sourceTableName, + TableName targetTableName) + throws Exception { + Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); + Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); + + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan()); + Result targetRow = targetScanner.next(); + Result sourceRow = sourceScanner.next(); + int rowsCount = 0; + while (targetRow!=null) { + rowsCount++; + if(!sourceRow.getRow().equals(targetRow)){ + targetRow = targetScanner.next(); + continue; + } + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + if (sourceCells.length != targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + + " has " + sourceCells.length + + " cells in source table but " + targetCells.length + + " cells in target table"); + } + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + try { + if (!CellUtil.matchingRow(sourceCell, targetCell)) { + Assert.fail("Rows don't match"); + } + if (!CellUtil.matchingFamily(sourceCell, targetCell)) { + Assert.fail("Families don't match"); + } + if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { + Assert.fail("Qualifiers don't match"); + } + if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { + Assert.fail("Timestamps don't match"); + } + if (!CellUtil.matchingValue(sourceCell, targetCell)) { + Assert.fail("Values don't match"); + } + } catch (Throwable t) { + LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); + Throwables.propagate(t); + } + } + targetRow = targetScanner.next(); + sourceRow = sourceScanner.next(); + } + assertEquals("Target expected rows does not match.",expectedRows, + rowsCount); + sourceScanner.close(); + targetScanner.close(); + sourceTable.close(); + targetTable.close(); + } + private Counters syncTables(TableName sourceTableName, TableName targetTableName, - Path testDir) throws Exception { + Path testDir, String... options) throws Exception { SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); - int code = syncTable.run(new String[] { - testDir.toString(), - sourceTableName.getNameAsString(), - targetTableName.getNameAsString() - }); + String[] args = Arrays.copyOf(options, options.length+3); + args[options.length] = testDir.toString(); + args[options.length+1] = sourceTableName.getNameAsString(); + args[options.length+2] = targetTableName.getNameAsString(); + int code = syncTable.run(args); assertEquals("sync table job failed", 0, code); LOG.info("Sync tables completed"); -- 2.10.1 (Apple Git-78)