Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1511851) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -28,25 +28,22 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.Stoppable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.UUID; /** * This class is responsible for replicating the edits coming @@ -68,7 +65,6 @@ // Name of the HDFS directory that contains the temporary rep logs public static final String REPLICATION_LOG_DIR = ".replogs"; private final Configuration conf; - private final ExecutorService sharedThreadPool; private final HConnection sharedHtableCon; private final ReplicationSinkMetrics metrics; @@ -84,11 +80,6 @@ this.conf = HBaseConfiguration.create(conf); decorateConf(); this.sharedHtableCon = HConnectionManager.createConnection(this.conf); - this.sharedThreadPool = new ThreadPoolExecutor(1, - conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE), - conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS, - new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-repl")); - ((ThreadPoolExecutor)this.sharedThreadPool).allowCoreThreadTimeOut(true); this.metrics = new ReplicationSinkMetrics(); } @@ -119,9 +110,9 @@ // to the same table. try { long totalReplicated = 0; - // Map of table => list of Rows, we only want to flushCommits once per - // invocation of this method per table. - Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); + // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per + // invocation of this method per table and cluster id. + Map>> rowMap = new TreeMap>>(Bytes.BYTES_COMPARATOR); for (HLog.Entry entry : entries) { WALEdit edit = entry.getEdit(); byte[] table = entry.getKey().getTablename(); @@ -131,14 +122,15 @@ List kvs = edit.getKeyValues(); for (KeyValue kv : kvs) { if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + UUID clusterId = entry.getKey().getClusterId(); if (kv.isDelete()) { del = new Delete(kv.getRow()); - del.setClusterId(entry.getKey().getClusterId()); - addToMultiMap(rows, table, del); + del.setClusterId(clusterId); + addToHashMultiMap(rowMap, table, clusterId, del); } else { put = new Put(kv.getRow()); - put.setClusterId(entry.getKey().getClusterId()); - addToMultiMap(rows, table, put); + put.setClusterId(clusterId); + addToHashMultiMap(rowMap, table, clusterId, put); } } if (kv.isDelete()) { @@ -150,8 +142,8 @@ } totalReplicated++; } - for(byte [] table : rows.keySet()) { - batch(table, rows.get(table)); + for(Map.Entry>> entry : rowMap.entrySet()) { + batch(entry.getKey(), entry.getValue().values()); } this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); @@ -167,15 +159,21 @@ * Simple helper to a map from key to (a list of) values * TODO: Make a general utility method * @param map - * @param key + * @param key1 + * @param key2 * @param value * @return */ - private List addToMultiMap(Map> map, K key, V value) { - List values = map.get(key); + private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { + Map> innerMap = map.get(key1); + if (innerMap == null) { + innerMap = new HashMap>(); + map.put(key1, innerMap); + } + List values = innerMap.get(key2); if (values == null) { values = new ArrayList(); - map.put(key, values); + innerMap.put(key2, values); } values.add(value); return values; @@ -186,15 +184,6 @@ */ public void stopReplicationSinkServices() { try { - this.sharedThreadPool.shutdown(); - if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) { - this.sharedThreadPool.shutdownNow(); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing. - Thread.currentThread().interrupt(); - } - try { this.sharedHtableCon.close(); } catch (IOException e) { LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. @@ -207,15 +196,17 @@ * @param rows list of actions * @throws IOException */ - private void batch(byte[] tableName, List rows) throws IOException { - if (rows.isEmpty()) { + private void batch(byte[] tableName, Collection> allRows) throws IOException { + if (allRows.isEmpty()) { return; } HTableInterface table = null; try { - table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool); - table.batch(rows); - this.metrics.appliedOpsRate.inc(rows.size()); + table = this.sharedHtableCon.getTable(tableName); + for (List rows : allRows) { + table.batch(rows); + this.metrics.appliedOpsRate.inc(rows.size()); + } } catch (InterruptedException ix) { throw new IOException(ix); } finally {