Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1224797) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -196,6 +197,94 @@ } /** + * Verify that version and column delete marker types are replicated + * correctly. + * @throws Exception + */ + @Test(timeout=300000) + public void testDeleteTypes() throws Exception { + LOG.info("testDeleteTypes"); + final byte[] v1 = Bytes.toBytes("v1"); + final byte[] v2 = Bytes.toBytes("v2"); + final byte[] v3 = Bytes.toBytes("v3"); + htable1 = new HTable(conf1, tableName); + + long t = EnvironmentEdgeManager.currentTimeMillis(); + // create three versions for "row" + Put put = new Put(row); + put.add(famName, row, t, v1); + htable1.put(put); + + put = new Put(row); + put.add(famName, row, t+1, v2); + htable1.put(put); + + put = new Put(row); + put.add(famName, row, t+2, v3); + htable1.put(put); + + Get get = new Get(row); + get.setMaxVersions(); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() < 3) { + LOG.info("Rows not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.raw()[0].getValue(), v3); + assertArrayEquals(res.raw()[1].getValue(), v2); + assertArrayEquals(res.raw()[2].getValue(), v1); + break; + } + } + // place a version delete marker (delete last version) + Delete d = new Delete(row); + d.deleteColumn(famName, row, t); + htable1.delete(d); + + get = new Get(row); + get.setMaxVersions(); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() > 2) { + LOG.info("Version not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.raw()[0].getValue(), v3); + assertArrayEquals(res.raw()[1].getValue(), v2); + break; + } + } + + // place a column delete marker + d = new Delete(row); + d.deleteColumns(famName, row, t+2); + htable1.delete(d); + + // now *both* of the remaining version should be deleted + // at the replica + get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + Result res = htable2.get(get); + if (res.size() >= 1) { + LOG.info("Rows not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + /** * Add a row, check it's replicated, delete it, check's gone * @throws Exception */ Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1224797) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; @@ -104,11 +105,21 @@ kvs.get(0).getTimestamp(), null); delete.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { - if (kv.isDeleteFamily()) { - delete.deleteFamily(kv.getFamily()); - } else if (!kv.isEmptyColumn()) { - delete.deleteColumn(kv.getFamily(), - kv.getQualifier()); + switch (Type.codeToType(kv.getType())) { + case DeleteFamily: + // family marker + delete.deleteFamily(kv.getFamily(), kv.getTimestamp()); + break; + case DeleteColumn: + // column marker + delete.deleteColumns(kv.getFamily(), kv.getQualifier(), + kv.getTimestamp()); + break; + case Delete: + // version marker + delete.deleteColumn(kv.getFamily(), kv.getQualifier(), + kv.getTimestamp()); + break; } } delete(entry.getKey().getTablename(), delete);