Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1371513) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -23,11 +23,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -40,6 +43,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.TreeMap; /** @@ -64,9 +73,11 @@ public static final String REPLICATION_LOG_DIR = ".replogs"; private final Configuration conf; // Pool used to replicated - private final HTablePool pool; + //private final HTablePool pool; + private final HConnection con; private final ReplicationSinkMetrics metrics; - + private final ExecutorService exec; + /** * Create a sink for replication * @@ -76,13 +87,31 @@ */ public ReplicationSink(Configuration conf, Stoppable stopper) throws IOException { - this.conf = conf; - this.pool = new HTablePool(this.conf, - conf.getInt("replication.sink.htablepool.capacity", 10)); + this.conf = new Configuration(conf); + decorateConf(); + this.con = HConnectionManager.createConnection(conf); this.metrics = new ReplicationSinkMetrics(); + // mostly copied from HTable + this.exec = new ThreadPoolExecutor(1, + conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE), + conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS, + new SynchronousQueue(), + new DaemonThreadFactory()); + ((ThreadPoolExecutor)this.exec).allowCoreThreadTimeOut(true); } /** + * decorate the Configuration object to make replication more receptive to delays: + * lessen the timeout and numTries. + */ + private void decorateConf() { + this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + this.conf.getInt("replication.sink.client.retries.number", 1)); + this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + this.conf.getInt("replication.sink.client.ops.timeout", 20)); + } + + /** * Replicate this array of entries directly into the local cluster * using the native client. * @@ -160,6 +189,17 @@ return values; } + public void stopReplicationSinkServices() { + this.exec.shutdown(); + try { + this.exec.shutdownNow(); + this.con.close(); + } catch (IOException e) { + // this is invoked when closing, swallowing it. + LOG.warn("IOException while closing pool" + e); + } + } + /** * Do the changes and handle the pool * @param tableName table to insert into @@ -172,7 +212,7 @@ } HTableInterface table = null; try { - table = this.pool.getTable(tableName); + table = new HTable(tableName, con, exec); table.batch(rows); } catch (InterruptedException ix) { throw new IOException(ix); @@ -182,4 +222,34 @@ } } } + + // copied from HTable + static class DaemonThreadFactory implements ThreadFactory { + static final AtomicInteger poolNumber = new AtomicInteger(1); + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null)? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "hbase-repl-pool" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1371513) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -128,6 +128,7 @@ public void join() { if (this.replication) { this.replicationManager.join(); + this.replicationSink.stopReplicationSinkServices(); } }