Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (revision 1351512) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (working copy) @@ -199,6 +199,35 @@ } /** + * This test makes sure that all the KVs belonging to a single row in a WALEdit + * retain their individual timestamps + * @throws Exception + */ + @Test + public void testTsMix() throws Exception { + HLog.Entry[] entries = new HLog.Entry[3]; + long now = System.currentTimeMillis(); + for(int i = 0; i < 3; i++) { + entries[i] = createEntry(TABLE_NAME1, 1, i, KeyValue.Type.Put, now+i); + } + // Kinda ugly, trying to merge all the entries into one + entries[0].getEdit().add(entries[1].getEdit().getKeyValues().get(0)); + entries[0].getEdit().add(entries[2].getEdit().getKeyValues().get(0)); + HLog.Entry[] entry = new HLog.Entry[1]; + entry[0] = entries[0]; + SINK.replicateEntries(entry); + + Scan scan = new Scan(); + ResultScanner scanRes = table1.getScanner(scan); + Result row = scanRes.next(); + for(int i = 0; i < 3; i++) { + LOG.info(row.raw()[i]); + LOG.info("Comparing " + (now+i) + " with " + row.raw()[i].getTimestamp()); + assertEquals(now+i, row.raw()[i].getTimestamp()); + } + } + + /** * Puts are buffered, but this tests when a delete (not-buffered) is applied * before the actual Put that creates it. * @throws Exception @@ -219,30 +248,37 @@ assertEquals(0, res.size()); } - private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) { + private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) { + return createEntry(table, row, row, type, -1); + } + + private HLog.Entry createEntry(byte [] table, int row, int qualifier, KeyValue.Type type, long ts) { + byte[] qual = Bytes.toBytes(qualifier); byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); - // Just make sure we don't get the same ts for two consecutive rows with - // same key - try { - Thread.sleep(1); - } catch (InterruptedException e) { - LOG.info("Was interrupted while sleep, meh", e); + if (ts == -1) { + // Just make sure we don't get the same ts for two consecutive rows with + // same key + try { + Thread.sleep(1); + } catch (InterruptedException e) { + LOG.info("Was interrupted while sleep, meh", e); + } + ts = System.currentTimeMillis(); } - final long now = System.currentTimeMillis(); KeyValue kv = null; if(type.getCode() == KeyValue.Type.Put.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, now, + kv = new KeyValue(rowBytes, fam, qual, ts, KeyValue.Type.Put, Bytes.toBytes(row)); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, - now, KeyValue.Type.DeleteColumn); + kv = new KeyValue(rowBytes, fam, qual, + ts, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { kv = new KeyValue(rowBytes, fam, null, - now, KeyValue.Type.DeleteFamily); + ts, KeyValue.Type.DeleteFamily); } - HLogKey key = new HLogKey(table, table, now, now, + HLogKey key = new HLogKey(table, table, ts, ts, HConstants.DEFAULT_CLUSTER_ID); WALEdit edit = new WALEdit(); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1351512) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -135,16 +135,15 @@ } // With mini-batching, we need to expect multiple rows per edit byte[] lastKey = kvs.get(0).getRow(); - Put put = new Put(kvs.get(0).getRow(), - kvs.get(0).getTimestamp()); + Put put = new Put(kvs.get(0).getRow()); put.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { if (!Bytes.equals(lastKey, kv.getRow())) { tableList.add(put); - put = new Put(kv.getRow(), kv.getTimestamp()); + put = new Put(kv.getRow()); put.setClusterId(entry.getKey().getClusterId()); } - put.add(kv.getFamily(), kv.getQualifier(), kv.getValue()); + put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv.getValue()); lastKey = kv.getRow(); } tableList.add(put);