Index: VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java =================================================================== --- VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -31,7 +31,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -43,6 +46,9 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. @@ -60,8 +66,6 @@ */ public class HTableMultiplexer { private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); - private static int poolID = 0; - private Map tableNameToHTableMap; /** The map between each region server to its corresponding buffer queue */ @@ -75,6 +79,10 @@ private HConnection connection; private int retryNum; private int perRegionServerBufferQueueSize; + private ScheduledExecutorService executor; + private long frequency = 100; + //initial number of threads in the pool + public static final int INITIAL_NUM_THREADS = 10; /** * @@ -92,6 +100,12 @@ Bytes.BYTES_COMPARATOR); this.retryNum = conf.getInt("hbase.client.retries.number", 10); this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; + this.frequency = conf.getLong("hbase.htablemultiplexer.flush.frequency.ms", + 100); + this.executor = Executors.newScheduledThreadPool( + INITIAL_NUM_THREADS, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HTableFlushWorker-%d").build()); } /** @@ -219,14 +233,7 @@ HTableFlushWorker worker = new HTableFlushWorker(conf, addr, this.connection, this, queue); this.serverToFlushWorkerMap.put(addr, worker); - - // Launch a daemon thread to flush the puts - // from the queue to its corresponding region server. - String name = "HTableFlushWorker-" + addr.getHostNameWithPort() + "-" - + (poolID++); - Thread t = new Thread(worker, name); - t.setDaemon(true); - t.start(); + executor.scheduleAtFixedRate(worker, frequency, frequency, TimeUnit.MICROSECONDS); } return queue; } @@ -652,129 +659,101 @@ @Override public void run() { + long start = EnvironmentEdgeManager.currentTimeMillis(); + long elapsed = 0; List processingList = new ArrayList(); - /** - * The frequency in milliseconds for the current thread to process the corresponding - * buffer queue. - **/ - long frequency = conf.getLong("hbase.htablemultiplexer.flush.frequency.ms", 100); - - // initial delay + int completelyFailed = 0; try { - Thread.sleep(frequency); - } catch (InterruptedException e) { - } // Ignore - - long start, elapsed; - int failedCount = 0; - while (true) { - try { - start = elapsed = System.currentTimeMillis(); - - // Clear the processingList, putToStatusMap and failedCount - processingList.clear(); - failedCount = 0; - - // drain all the queued puts into the tmp list - queue.drainTo(processingList); - currentProcessingPutCount.set(processingList.size()); - if (minProcessingPutCount.get() > currentProcessingPutCount.get()) { - minProcessingPutCount.set(currentProcessingPutCount.get()); - } else if (maxProcessingPutCount.get() < currentProcessingPutCount.get()) { - maxProcessingPutCount.set(currentProcessingPutCount.get()); + // drain all the queued puts into the tmp list + queue.drainTo(processingList); + currentProcessingPutCount.set(processingList.size()); + if (minProcessingPutCount.get() > currentProcessingPutCount.get()) { + minProcessingPutCount.set(currentProcessingPutCount.get()); + } else if (maxProcessingPutCount.get() < currentProcessingPutCount + .get()) { + maxProcessingPutCount.set(currentProcessingPutCount.get()); + } + avgProcessingPutCount.add(currentProcessingPutCount.get()); + if (processingList.size() > 0) { + MultiPut mput = new MultiPut(this.addr); + HBaseRPCOptions options = null; + for (PutStatus putStatus : processingList) { + // Update the MultiPut + mput.add(putStatus.getRegionInfo().getRegionName(), + putStatus.getPut()); + if (putStatus.getOptions() != null) { + options = putStatus.getOptions(); + } } - avgProcessingPutCount.add(currentProcessingPutCount.get()); - if (processingList.size() > 0) { - // Create the MultiPut object - // Amit: Need to change this to use multi, at some point in future. - MultiPut mput = new MultiPut(this.addr); - HBaseRPCOptions options = null; - for (PutStatus putStatus: processingList) { - // Update the MultiPut - mput.add(putStatus.getRegionInfo().getRegionName(), - putStatus.getPut()); - if (putStatus.getOptions () != null) { - options = putStatus.getOptions (); + + // Process this multiput request + List failed = null; + Map failureInfo = new HashMap(); + try { + failed = connection.processListOfMultiPut(Arrays.asList(mput), + null, options, failureInfo); + } catch (PreemptiveFastFailException e) { + // Client is not blocking on us. So, let us treat this + // as a normal failure, and retry. + for (PutStatus putStatus : processingList) { + if (!resubmitFailedPut(putStatus, this.addr)) { + completelyFailed++; } } - - // Process this multiput request - List failed = null; - Map failureInfo = - new HashMap(); - try { - failed = connection.processListOfMultiPut(Arrays.asList(mput), null, options, - failureInfo); - } catch(PreemptiveFastFailException e) { - // Client is not blocking on us. So, let us treat this - // as a normal failure, and retry. - for (PutStatus putStatus: processingList) { + } + + long putsToRetry = 0; + if (failed != null) { + if (failed.size() == processingList.size()) { + // All the puts for this region server are failed. Going to retry + // it later + for (PutStatus putStatus : processingList) { if (!resubmitFailedPut(putStatus, this.addr)) { - failedCount++; + completelyFailed++; } } - } - - long putsToRetry = 0; - if (failed != null) { - if (failed.size() == processingList.size()) { - // All the puts for this region server are failed. Going to retry it later - for (PutStatus putStatus: processingList) { - if (!resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } - } - } else { - Set failedPutSet = new HashSet(failed); - for (PutStatus putStatus: processingList) { - if (failedPutSet.contains(putStatus.getPut()) - && !resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } + } else { + Set failedPutSet = new HashSet(failed); + for (PutStatus putStatus : processingList) { + if (failedPutSet.contains(putStatus.getPut()) + && !resubmitFailedPut(putStatus, this.addr)) { + completelyFailed++; } } - putsToRetry = failed.size() - failedCount; - } - // Update the totalFailedCount - this.totalFailedPutCount.addAndGet(failedCount); - // Update the totalSucceededPutCount - this.totalSucceededPutCount.addAndGet(processingList.size() - - failedCount - putsToRetry); - // Updated the total retried put counts. - this.totalRetriedPutCount.addAndGet(putsToRetry); - - elapsed = System.currentTimeMillis() - start; - // Update latency counters - averageLatency.add(elapsed); - if (elapsed > maxLatency.get()) { - maxLatency.set(elapsed); - } - - // Log some basic info - if (LOG.isDebugEnabled()) { - LOG.debug("Processed " + currentProcessingPutCount - + " put requests for " + addr.getHostNameWithPort() - + " and " + failedCount + " failed" - + ", latency for this send: " + elapsed); } - - // Reset the current processing put count - currentProcessingPutCount.set(0); + putsToRetry = failed.size() - completelyFailed; } - - // Sleep for a while - if (elapsed == start) { - elapsed = System.currentTimeMillis() - start; + // Update the totalFailedCount + this.totalFailedPutCount.addAndGet(completelyFailed); + // Update the totalSucceededPutCount + this.totalSucceededPutCount.addAndGet(processingList.size() + - completelyFailed - putsToRetry); + // Updated the total retried put counts. + this.totalRetriedPutCount.addAndGet(putsToRetry); + + elapsed = EnvironmentEdgeManager.currentTimeMillis() - start; + // Update latency counters + averageLatency.add(elapsed); + if (elapsed > maxLatency.get()) { + maxLatency.set(elapsed); } - if (elapsed < frequency) { - Thread.sleep(frequency - elapsed); + + // Log some basic info + if (LOG.isDebugEnabled()) { + LOG.debug("Processed " + currentProcessingPutCount + + " put requests for " + addr.getHostNameWithPort() + " and " + + completelyFailed + " failed" + ", latency for this send: " + + elapsed); } - } catch (Exception e) { - // Log all the exceptions and move on - LOG.debug("Caught some exceptions " + e - + " when flushing puts to region server " - + addr.getHostNameWithPort()); + + // Reset the current processing put count + currentProcessingPutCount.set(0); } + } catch (Exception e) { + // Log all the exceptions and move on + LOG.debug("Caught some exceptions " + e + + " when flushing puts to region server " + + addr.getHostNameWithPort()); } } } Index: VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java =================================================================== --- VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java +++ VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java @@ -19,6 +19,14 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -32,9 +40,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; public class TestHTableMultiplexer { final Log LOG = LogFactory.getLog(getClass()); @@ -233,5 +238,116 @@ multiplexer.put(TABLE1, put, HBaseRPCOptions.DEFAULT); Assert.assertEquals("storedHTableCount", 1, status.getStoredHTableCount()); } + + /** + * Test when multiple client threads are using HTableMultiplexer. Spawn 10 + * threads that do 10k multiputs each, and check in the end that we got + * expected number of results back when we do Gets. + * + * @throws Exception + */ + @Test + public void testMultipleThreads() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong("hbase.htablemultiplexer.flush.frequency.ms", 10); + byte[] TABLE = Bytes.toBytes("testMultipleThreads"); + HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }); + HTableMultiplexer multiplexer = new HTableMultiplexer( + TEST_UTIL.getConfiguration(), 1000); + ExecutorService executor = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(10); + byte[] rowPrefix = Bytes.toBytes("row"); + for (int i = 0; i < 10; i++) { + byte[] suffix = Bytes.toBytes(i); + byte[] row = Bytes.add(rowPrefix, suffix); + Runnable runnable = new Client(multiplexer, TABLE, row); + Future future = executor.submit(runnable); + futures.add(future); + } + for (Future f : futures) { + f.get(); + } + // Wait for multiplexer flush + Thread.sleep(2000); + for (int i = 0; i < 10; i++) { + byte[] suffix = Bytes.toBytes(i); + byte[] row = Bytes.add(rowPrefix, suffix); + checkForGets(ht, row); + } + // check the latencies + HTableMultiplexerStatus status = multiplexer.getHTableMultiplexerStatus(); + System.out.println("max latency: " + status.getMaxLatency()); + } + + /** + * Utility method to check if we got all the data back after putting with + * multiplexer + */ + public void checkForGets(HTable ht, byte[] row) throws IOException { + for (int i = 0; i < 10000; i++) { + + byte[] suffix = Bytes.toBytes(i); + byte[] exactRow = Bytes.add(row, suffix); + + Get get = new Get(exactRow); + Result r = ht.get(get); + Assert.assertEquals(1, r.getKvs().size()); + Assert.assertEquals(Bytes.toString(exactRow), + Bytes.toString(r.getKvs().get(0).getValue())); + } + } + + /** + * A client which is doing 10k puts via multiplexer + * + */ + public static class Client implements Runnable { + private HTableMultiplexer multiPlex; + private byte[] ht; + private byte[] row; + private byte[] dummy = Bytes.toBytes("dummy"); + + public Client(HTableMultiplexer multiPlex, byte[] ht, byte[] row) { + this.multiPlex = multiPlex; + this.ht = ht; + this.row = row; + } + + @Override + public void run() { + int maxTry = 0; + for (int i = 0; i < 10000; i++) { + try { + // sleeping so that we don't put a whole bunch of data at once + Thread.sleep(1); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + byte[] suffix = Bytes.toBytes(i); + byte[] exactRow = Bytes.add(row, suffix); + Put put = new Put(exactRow); + put.add(FAMILY, dummy, exactRow); + try { + boolean success = true; + int numTry = 0; + while (true) { + success = multiPlex.put(ht, put, HBaseRPCOptions.DEFAULT); + numTry++; + if (success) + break; + else + Thread.sleep(1000); + } + if (numTry > maxTry) + maxTry = numTry; + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + System.out.println("Max number of times this thread retried: " + maxTry); + } + } }