Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1511867) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -20,10 +20,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -125,10 +128,9 @@ // to the same table. try { long totalReplicated = 0; - // Map of table => list of Rows, we only want to flushCommits once per - // invocation of this method per table. - Map> rows = - new TreeMap>(); + // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per + // invocation of this method per table and cluster id. + Map>> rowMap = new TreeMap>>(); for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -148,7 +150,7 @@ new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); m.setClusterId(uuid); - addToMultiMap(rows, table, m); + addToHashMultiMap(rowMap, table, uuid, m); } if (CellUtil.isDelete(cell)) { ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); @@ -159,8 +161,8 @@ } totalReplicated++; } - for (Entry> entry : rows.entrySet()) { - batch(entry.getKey(), entry.getValue()); + for (Entry>> entry : rowMap.entrySet()) { + batch(entry.getKey(), entry.getValue().values()); } int size = entries.size(); this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); @@ -190,15 +192,21 @@ * Simple helper to a map from key to (a list of) values * TODO: Make a general utility method * @param map - * @param key + * @param key1 + * @param key2 * @param value * @return */ - private List addToMultiMap(Map> map, K key, V value) { - List values = map.get(key); + private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { + Map> innerMap = map.get(key1); + if (innerMap == null) { + innerMap = new HashMap>(); + map.put(key1, innerMap); + } + List values = innerMap.get(key1); if (values == null) { values = new ArrayList(); - map.put(key, values); + innerMap.put(key2, values); } values.add(value); return values; @@ -231,14 +239,16 @@ * @param rows list of actions * @throws IOException */ - private void batch(TableName tableName, List rows) throws IOException { - if (rows.isEmpty()) { + private void batch(TableName tableName, Collection> allRows) throws IOException { + if (allRows.isEmpty()) { return; } HTableInterface table = null; try { table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool); - table.batch(rows); + for (List rows : allRows) { + table.batch(rows); + } } catch (InterruptedException ix) { throw new IOException(ix); } finally {