diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 3bed8bb..89e9e0c 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; @@ -36,6 +35,8 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -87,7 +88,7 @@ public class ReplicationSink { * @param entries * @throws IOException */ - public synchronized void replicateEntries(HLog.Entry[] entries) + public void replicateEntries(HLog.Entry[] entries) throws IOException { if (entries.length == 0) { return; @@ -96,8 +97,9 @@ public class ReplicationSink { // to the same table. try { long totalReplicated = 0; - byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY; - List puts = new ArrayList(); + // Map of table => list of puts, we only want to flushCommits once per + // invocation of this method per table. + Map> puts = new TreeMap>(Bytes.BYTES_COMPARATOR); for (HLog.Entry entry : entries) { WALEdit edit = entry.getEdit(); List kvs = edit.getKeyValues(); @@ -114,9 +116,11 @@ public class ReplicationSink { } delete(entry.getKey().getTablename(), delete); } else { - // Switching table, flush - if (!Bytes.equals(lastTable, entry.getKey().getTablename())) { - put(lastTable, puts); + byte[] table = entry.getKey().getTablename(); + List tableList = puts.get(table); + if (tableList == null) { + tableList = new ArrayList(); + puts.put(table, tableList); } // With mini-batching, we need to expect multiple rows per edit byte[] lastKey = kvs.get(0).getRow(); @@ -124,18 +128,19 @@ public class ReplicationSink { kvs.get(0).getTimestamp()); for (KeyValue kv : kvs) { if (!Bytes.equals(lastKey, kv.getRow())) { - puts.add(put); + tableList.add(put); put = new Put(kv.getRow(), kv.getTimestamp()); } put.add(kv.getFamily(), kv.getQualifier(), kv.getValue()); lastKey = kv.getRow(); } - puts.add(put); - lastTable = entry.getKey().getTablename(); + tableList.add(put); } totalReplicated++; } - put(lastTable, puts); + for(byte [] table : puts.keySet()) { + put(table, puts.get(table)); + } this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); this.metrics.appliedBatchesRate.inc(1); @@ -174,8 +179,6 @@ public class ReplicationSink { table = this.pool.getTable(tableName); table.put(puts); this.metrics.appliedOpsRate.inc(puts.size()); - this.pool.putTable(table); - puts.clear(); } finally { if (table != null) { this.pool.putTable(table); @@ -195,7 +198,6 @@ public class ReplicationSink { table = this.pool.getTable(tableName); table.delete(delete); this.metrics.appliedOpsRate.inc(1); - this.pool.putTable(table); } finally { if (table != null) { this.pool.putTable(table);