Index: src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java (revision 1241774) +++ src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.thrift.CallQueue.Call; +import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.metrics.ContextFactory; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsUtil; @@ -106,7 +107,8 @@ private static ThriftMetrics createMetrics() throws Exception { setupMetricsContext(); Configuration conf = UTIL.getConfiguration(); - return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf); + return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf, + Hbase.Iface.class); } private static void setupMetricsContext() throws Exception { Index: src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java (revision 1241774) +++ src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java (working copy) @@ -141,7 +141,7 @@ private static ThriftMetrics getMetrics(Configuration conf) throws Exception { setupMetricsContext(); - return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf); + return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf, Hbase.Iface.class); } private static void setupMetricsContext() throws IOException { Index: src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java (revision 1241774) +++ src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java (working copy) @@ -19,6 +19,13 @@ */ package org.apache.hadoop.hbase.thrift2; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -26,15 +33,22 @@ import java.util.Comparator; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift2.generated.TColumn; import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement; import org.apache.hadoop.hbase.thrift2.generated.TColumnValue; import org.apache.hadoop.hbase.thrift2.generated.TDelete; import org.apache.hadoop.hbase.thrift2.generated.TDeleteType; import org.apache.hadoop.hbase.thrift2.generated.TGet; +import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.thrift2.generated.TIOError; import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument; import org.apache.hadoop.hbase.thrift2.generated.TIncrement; @@ -42,6 +56,11 @@ import org.apache.hadoop.hbase.thrift2.generated.TResult; import org.apache.hadoop.hbase.thrift2.generated.TScan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.spi.NoEmitMetricsContext; +import org.apache.hadoop.metrics.spi.OutputRecord; import org.apache.thrift.TException; import org.junit.AfterClass; import org.junit.Before; @@ -49,13 +68,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.*; - /** * Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2 package. */ @Category(MediumTests.class) public class TestThriftHBaseServiceHandler { + + public static final Log LOG = LogFactory.getLog(TestThriftHBaseServiceHandler.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); // Static names for tables, columns, rows, and values @@ -513,6 +532,75 @@ } } + @Test + public void testMetrics() throws Exception { + Configuration conf = UTIL.getConfiguration(); + ThriftMetrics metrics = getMetrics(conf); + THBaseService.Iface handler = + ThriftHBaseServiceHandler.newInstance(conf, metrics); + byte[] rowName = "testMetrics".getBytes(); + ByteBuffer table = ByteBuffer.wrap(tableAname); + + TGet get = new TGet(ByteBuffer.wrap(rowName)); + assertFalse(handler.exists(table, get)); + + List columnValues = new ArrayList(); + columnValues.add(new TColumnValue(ByteBuffer.wrap(familyAname), + ByteBuffer.wrap(qualifierAname), + ByteBuffer.wrap(valueAname))); + columnValues.add(new TColumnValue(ByteBuffer.wrap(familyBname), + ByteBuffer.wrap(qualifierBname), + ByteBuffer.wrap(valueBname))); + TPut put = new TPut(ByteBuffer.wrap(rowName), columnValues); + put.setColumnValues(columnValues); + + handler.put(table, put); + + assertTrue(handler.exists(table, get)); + logMetrics(metrics); + verifyMetrics(metrics, "put_num_ops", 1); + verifyMetrics(metrics, "exists_num_ops", 2); + } + + private static ThriftMetrics getMetrics(Configuration conf) throws Exception { + setupMetricsContext(); + return new ThriftMetrics(Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT), + conf, THBaseService.Iface.class); + } + + private static void setupMetricsContext() throws IOException { + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class", + NoEmitMetricsContext.class.getName()); + MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME) + .createRecord(ThriftMetrics.CONTEXT_NAME).remove(); + } + + private static void logMetrics(ThriftMetrics metrics) throws Exception { + MetricsContext context = MetricsUtil.getContext( + ThriftMetrics.CONTEXT_NAME); + metrics.doUpdates(context); + if (!LOG.isDebugEnabled()) return; + for (String key : context.getAllRecords().keySet()) { + for (OutputRecord record : context.getAllRecords().get(key)) { + for (String name : record.getMetricNames()) { + LOG.debug("metrics:" + name + " value:" + + record.getMetric(name).intValue()); + } + } + } + } + + private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue) + throws Exception { + MetricsContext context = MetricsUtil.getContext( + ThriftMetrics.CONTEXT_NAME); + metrics.doUpdates(context); + OutputRecord record = context.getAllRecords().get( + ThriftMetrics.CONTEXT_NAME).iterator().next(); + assertEquals(expectValue, record.getMetric(name).intValue()); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java (revision 1241774) +++ src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java (working copy) @@ -19,8 +19,29 @@ */ package org.apache.hadoop.hbase.thrift2; -import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromHBase; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift; +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,6 +49,7 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift2.generated.TDelete; import org.apache.hadoop.hbase.thrift2.generated.TGet; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; @@ -39,13 +61,6 @@ import org.apache.hadoop.hbase.thrift2.generated.TScan; import org.apache.thrift.TException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - /** * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily defined in the * HTableInterface. @@ -61,7 +76,49 @@ private final AtomicInteger nextScannerId = new AtomicInteger(0); private final Map scannerMap = new ConcurrentHashMap(); - public ThriftHBaseServiceHandler(Configuration conf) { + public static THBaseService.Iface newInstance( + Configuration conf, ThriftMetrics metrics) { + THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf); + return (THBaseService.Iface) Proxy.newProxyInstance( + handler.getClass().getClassLoader(), + handler.getClass().getInterfaces(), + new THBaseServiceMetricsProxy(handler, metrics)); + } + + private static class THBaseServiceMetricsProxy implements InvocationHandler { + private final THBaseService.Iface handler; + private final ThriftMetrics metrics; + + private THBaseServiceMetricsProxy( + THBaseService.Iface handler, ThriftMetrics metrics) { + this.handler = handler; + this.metrics = metrics; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) + throws Throwable { + Object result; + try { + long start = now(); + result = m.invoke(handler, args); + int processTime = (int)(now() - start); + metrics.incMethodTime(m.getName(), processTime); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException( + "unexpected invocation exception: " + e.getMessage()); + } + return result; + } + } + + private static long now() { + return System.nanoTime(); + } + + ThriftHBaseServiceHandler(Configuration conf) { htablePool = new HTablePool(conf, Integer.MAX_VALUE); } Index: src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java (revision 1241774) +++ src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java (working copy) @@ -19,6 +19,15 @@ */ package org.apache.hadoop.hbase.thrift2; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -29,7 +38,11 @@ import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.thrift.CallQueue; +import org.apache.hadoop.hbase.thrift.CallQueue.Call; +import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; @@ -46,19 +59,16 @@ import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.List; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the * HbaseClient.thrift IDL file. */ public class ThriftServer { - private static final Log log = LogFactory.getLog("ThriftServer"); + private static final Log log = LogFactory.getLog(ThriftServer.class); - private static final String DEFAULT_LISTEN_PORT = "9090"; + public static final String DEFAULT_LISTEN_PORT = "9090"; public ThriftServer() { } @@ -141,17 +151,33 @@ return new TNonblockingServer(serverArgs); } - private static TServer getTHsHaServer(TProtocolFactory protocolFactory, THBaseService.Processor processor, - TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException { + private static TServer getTHsHaServer(TProtocolFactory protocolFactory, + THBaseService.Processor processor, TTransportFactory transportFactory, + InetSocketAddress inetSocketAddress, ThriftMetrics metrics) + throws TTransportException { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); + ExecutorService executorService = createExecutor( + serverArgs.getWorkerThreads(), metrics); + serverArgs.executorService(executorService); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); serverArgs.protocolFactory(protocolFactory); return new THsHaServer(serverArgs); } + private static ExecutorService createExecutor( + int workerThreads, ThriftMetrics metrics) { + CallQueue callQueue = new CallQueue( + new LinkedBlockingQueue(), metrics); + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setDaemon(true); + tfb.setNameFormat("thrift2-worker-%d"); + return new ThreadPoolExecutor(workerThreads, workerThreads, + Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + } + private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException { TServerTransport serverTransport = new TServerSocket(inetSocketAddress); @@ -195,10 +221,14 @@ boolean nonblocking = cmd.hasOption("nonblocking"); boolean hsha = cmd.hasOption("hsha"); + Configuration conf = HBaseConfiguration.create(); + ThriftMetrics metrics = new ThriftMetrics( + listenPort, conf, THBaseService.Iface.class); + // Construct correct ProtocolFactory TProtocolFactory protocolFactory = getTProtocolFactory(cmd.hasOption("compact")); - THBaseService.Iface handler = new ThriftHBaseServiceHandler( - HBaseConfiguration.create()); + THBaseService.Iface handler = + ThriftHBaseServiceHandler.newInstance(conf, metrics); THBaseService.Processor processor = new THBaseService.Processor(handler); boolean framed = cmd.hasOption("framed") || nonblocking || hsha; @@ -217,7 +247,7 @@ if (nonblocking) { server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress); } else if (hsha) { - server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress); + server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics); } else { server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress); } Index: src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (revision 1241774) +++ src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (working copy) @@ -227,7 +227,7 @@ public ThriftServerRunner(Configuration conf, HBaseHandler handler) { this.conf = HBaseConfiguration.create(conf); this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); - this.metrics = new ThriftMetrics(listenPort, conf); + this.metrics = new ThriftMetrics(listenPort, conf, Hbase.Iface.class); this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); } Index: src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (revision 1241774) +++ src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (working copy) @@ -61,7 +61,7 @@ private MetricsTimeVaryingRate slowThriftCall = new MetricsTimeVaryingRate("slowThriftCall", registry); - public ThriftMetrics(int port, Configuration conf) { + public ThriftMetrics(int port, Configuration conf, Class iface) { slowResponseTime = conf.getLong( SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC); context = MetricsUtil.getContext(CONTEXT_NAME); @@ -73,7 +73,7 @@ context.registerUpdater(this); - createMetricsForMethods(Hbase.Iface.class); + createMetricsForMethods(iface); } public void incTimeInQueue(long time) {