diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 41b19ff..d6e7451 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -119,25 +119,47 @@ public class ReplicationSink { // to the same table. try { long totalReplicated = 0; - // Map of table => list of Rows, we only want to flushCommits once per - // invocation of this method per table. - Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); + // metrics for how long my optimization takes + long startTime = System.currentTimeMillis(); + //STEP 1: split entries based on table + Map> entriesByTable = new TreeMap>(Bytes.BYTES_COMPARATOR); for (HLog.Entry entry : entries) { - WALEdit edit = entry.getEdit(); byte[] table = entry.getKey().getTablename(); + List hlogEntries = entriesByTable.get(table); + if (hlogEntries == null) { + hlogEntries = new ArrayList(); + entriesByTable.put(table, hlogEntries); + } + hlogEntries.add(entry); + } + // STEP 2: + for (byte[] table : entriesByTable.keySet()) { + // for each table, merge all the hlog entries and ort + List hlogEntriesPerTable = entriesByTable.get(table); + List allKVPerTable = new ArrayList(); + UUID clusterId = null; + for (HLog.Entry entry : hlogEntriesPerTable) { + WALEdit edit = entry.getEdit(); + clusterId = entry.getKey().getClusterId(); + List kvs = edit.getKeyValues(); + allKVPerTable.addAll(kvs); + } + Collections.sort(allKVPerTable, KeyValue.COMPARATOR); Put put = null; Delete del = null; KeyValue lastKV = null; - List kvs = edit.getKeyValues(); - for (KeyValue kv : kvs) { + Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); + for (KeyValue kv : allKVPerTable) { + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { if (kv.isDelete()) { del = new Delete(kv.getRow()); - del.setClusterId(entry.getKey().getClusterId()); + del.setClusterId(clusterId); addToMultiMap(rows, table, del); } else { put = new Put(kv.getRow()); - put.setClusterId(entry.getKey().getClusterId()); + put.setClusterId(clusterId); addToMultiMap(rows, table, put); } } @@ -147,14 +169,15 @@ public class ReplicationSink { put.add(kv); } lastKV = kv; + totalReplicated++; } - totalReplicated++; - } - for(byte [] table : rows.keySet()) { + // got actions for a give table + long s = System.currentTimeMillis(); batch(table, rows.get(table)); } + this.metrics.setAgeOfLastAppliedOp( - entries[entries.length-1].getKey().getWriteTime()); + entries[entries.length - 1].getKey().getWriteTime()); this.metrics.appliedBatchesRate.inc(1); LOG.info("Total replicated: " + totalReplicated); } catch (IOException ex) {