From 4f2b184f3406cd60c870ce01d32383595fbc8d48 Mon Sep 17 00:00:00 2001 From: Yang Guang Date: Thu, 22 Sep 2016 11:52:59 -0700 Subject: [PATCH] Add latency metrics for REST. --- .../hadoop/hbase/rest/MetricsRESTSource.java | 35 ++++++++++++++++++++++ .../hadoop/hbase/rest/MetricsRESTSourceImpl.java | 27 +++++++++++++++++ .../org/apache/hadoop/hbase/rest/MetricsREST.java | 16 ++++++++++ .../apache/hadoop/hbase/rest/MultiRowResource.java | 4 +++ .../hadoop/hbase/rest/ProtobufStreamingUtil.java | 14 ++++++++- .../org/apache/hadoop/hbase/rest/ResourceBase.java | 1 + .../org/apache/hadoop/hbase/rest/RowResource.java | 16 ++++++++++ .../hadoop/hbase/rest/TableScanResource.java | 22 ++++++++++---- 8 files changed, 129 insertions(+), 6 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java index ceec41d..5bdee98 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java @@ -26,6 +26,17 @@ import org.apache.hadoop.hbase.metrics.JvmPauseMonitorSource; */ public interface MetricsRESTSource extends BaseSource, JvmPauseMonitorSource { + /** + * HBase operations. Note this does not exactly map to the HTTP method, + * for example, user might use POST to put or delete records. + */ + enum Operation { + GET, + PUT, + DELETE, + SCAN + } + String METRICS_NAME = "REST"; String CONTEXT = "rest"; @@ -52,6 +63,16 @@ public interface MetricsRESTSource extends BaseSource, JvmPauseMonitorSource { String FAILED_SCAN_KEY = "failedScanCount"; + String PROCESS_EXCEPTIONS_KEY = "processExceptionsCount"; + + /** + * Processing time for different operations. + */ + String PROCESSING_TIME_GET_KEY = "processingTimeGet"; + String PROCESSING_TIME_PUT_KEY = "processingTimePut"; + String PROCESSING_TIME_DELETE_KEY = "processingTimeDelete"; + String PROCESSING_TIME_SCAN_KEY = "processingTimeScan"; + /** * Increment the number of requests * @@ -114,4 +135,18 @@ public interface MetricsRESTSource extends BaseSource, JvmPauseMonitorSource { * @param inc the inc */ void incrementFailedScanRequests(final int inc); + + /** + * Increment the number of exceptions. + * + * @param inc the number of exceptions. + */ + void incrementProcessException(final int inc); + + /** + * Track the time spending on the given operation. + * @param op operation + * @param duration duration performing the operation in milli-seconds. + */ + void processedOperation(final Operation op, final long duration); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java index 30625f8..d916661 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hbase.rest; +import java.util.EnumMap; +import java.util.Map; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableHistogram; /** * Hadoop Two implementation of a metrics2 source that will export metrics from the Rest server to @@ -41,6 +45,9 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST private MutableFastCounter fPut; private MutableFastCounter fDel; private MutableFastCounter fScan; + private MutableFastCounter processExceptions; + + private Map processingTimeByOp; // pause monitor metrics private final MutableFastCounter infoPauseThresholdExceeded; @@ -81,6 +88,13 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST fPut = getMetricsRegistry().getCounter(FAILED_PUT_KEY, 0l); fDel = getMetricsRegistry().getCounter(FAILED_DELETE_KEY, 0l); fScan = getMetricsRegistry().getCounter(FAILED_SCAN_KEY, 0l); + processExceptions = getMetricsRegistry().getCounter(PROCESS_EXCEPTIONS_KEY, 0l); + + processingTimeByOp = new EnumMap(Operation.class); + processingTimeByOp.put(Operation.GET, getMetricsRegistry().getHistogram(PROCESSING_TIME_GET_KEY)); + processingTimeByOp.put(Operation.PUT, getMetricsRegistry().getHistogram(PROCESSING_TIME_PUT_KEY)); + processingTimeByOp.put(Operation.DELETE, getMetricsRegistry().getHistogram(PROCESSING_TIME_DELETE_KEY)); + processingTimeByOp.put(Operation.SCAN, getMetricsRegistry().getHistogram(PROCESSING_TIME_SCAN_KEY)); } @Override @@ -147,4 +161,17 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST public void updatePauseTimeWithoutGc(long t) { pausesWithoutGc.add(t); } + + @Override + public void incrementProcessException(final int inc) { + processExceptions.incr(inc); + } + + @Override + public void processedOperation(final Operation op, final long duration) { + MutableHistogram mh = processingTimeByOp.get(op); + if (op != null) { + mh.add(duration); + } + } } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java index e31037a..d96a5b0 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.rest.MetricsRESTSource; +import org.apache.hadoop.hbase.rest.MetricsRESTSource.Operation; @InterfaceAudience.Private public class MetricsREST { @@ -100,4 +101,19 @@ public class MetricsREST { source.incrementFailedScanRequests(inc); } + /** + * @param inc How much to add to processException. + */ + public void incrementProcessException(final int inc) { + source.incrementProcessException(inc); + } + + /** + * Track the time spending on the given operation. + * @param op the operation + * @param duration duration performing the operation in milli-seconds. + */ + public void processedOperation(final Operation op, final long duration) { + source.processedOperation(op, duration); + } } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java index b952c00..eaebcee 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/MultiRowResource.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.MetricsRESTSource.Operation; @InterfaceAudience.Private public class MultiRowResource extends ResourceBase implements Constants { @@ -72,6 +73,7 @@ public class MultiRowResource extends ResourceBase implements Constants { MultivaluedMap params = uriInfo.getQueryParameters(); servlet.getMetrics().incrementRequests(1); + final long startTime = System.currentTimeMillis(); try { CellSetModel model = new CellSetModel(); for (String rk : params.get(ROW_KEYS_PARAM_NAME)) { @@ -105,6 +107,8 @@ public class MultiRowResource extends ResourceBase implements Constants { } } + servlet.getMetrics().processedOperation(Operation.GET, + System.currentTimeMillis() - startTime); if (model.getRows().size() == 0) { //If no rows found. servlet.getMetrics().incrementFailedGetRequests(1); diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java index cb0f4c8..a596b0d 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; import java.io.OutputStream; import java.util.List; +import java.util.concurrent.Callable; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.StreamingOutput; @@ -44,11 +45,17 @@ public class ProtobufStreamingUtil implements StreamingOutput { private int limit; private int fetchSize; - protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize) { + /** + * Callback function after done the streaming. + */ + private final Callable doneStreaming; + + protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize, Callable doneStreaming) { this.resultScanner = scanner; this.contentType = type; this.limit = limit; this.fetchSize = fetchSize; + this.doneStreaming = doneStreaming; if (LOG.isTraceEnabled()) { LOG.trace("Created ScanStreamingUtil with content type = " + this.contentType + " user limit : " + this.limit + " scan fetch size : " + this.fetchSize); @@ -76,6 +83,11 @@ public class ProtobufStreamingUtil implements StreamingOutput { writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream); } } + try { + doneStreaming.call(); + } catch (Exception ex) { + // ignore + } } private void writeToStream(CellSetModel model, String contentType, OutputStream outStream) diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java index f71d848..f2a6c46 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ResourceBase.java @@ -45,6 +45,7 @@ public class ResourceBase implements Constants { } protected Response processException(Throwable exp) { + servlet.getMetrics().incrementProcessException(1); Throwable curr = exp; if(accessDeniedClazz != null) { //some access denied exceptions are buried diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java index de84625..6fd221d 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.MetricsRESTSource.Operation; import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private @@ -89,6 +90,7 @@ public class RowResource extends ResourceBase { LOG.trace("GET " + uriInfo.getAbsolutePath()); } servlet.getMetrics().incrementRequests(1); + final long startTime = System.currentTimeMillis(); MultivaluedMap params = uriInfo.getQueryParameters(); try { ResultGenerator generator = @@ -119,6 +121,7 @@ public class RowResource extends ResourceBase { value = generator.next(); } while (value != null); model.addRow(rowModel); + servlet.getMetrics().processedOperation(Operation.GET, System.currentTimeMillis() - startTime); servlet.getMetrics().incrementSucessfulGetRequests(1); return Response.ok(model).build(); } catch (Exception e) { @@ -134,6 +137,7 @@ public class RowResource extends ResourceBase { LOG.trace("GET " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY); } servlet.getMetrics().incrementRequests(1); + final long startTime = System.currentTimeMillis(); // doesn't make sense to use a non specific coordinate as this can only // return a single cell if (!rowspec.hasColumns() || rowspec.getColumns().length > 1) { @@ -155,6 +159,7 @@ public class RowResource extends ResourceBase { Cell value = generator.next(); ResponseBuilder response = Response.ok(CellUtil.cloneValue(value)); response.header("X-Timestamp", value.getTimestamp()); + servlet.getMetrics().processedOperation(Operation.GET, System.currentTimeMillis() - startTime); servlet.getMetrics().incrementSucessfulGetRequests(1); return response.build(); } catch (Exception e) { @@ -182,6 +187,7 @@ public class RowResource extends ResourceBase { .build(); } + final long startTime = System.currentTimeMillis(); Table table = null; try { List rows = model.getRows(); @@ -228,6 +234,7 @@ public class RowResource extends ResourceBase { table = servlet.getTable(tableResource.getName()); table.put(puts); ResponseBuilder response = Response.ok(); + servlet.getMetrics().processedOperation(Operation.PUT, System.currentTimeMillis() - startTime); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); } catch (Exception e) { @@ -246,6 +253,7 @@ public class RowResource extends ResourceBase { Response updateBinary(final byte[] message, final HttpHeaders headers, final boolean replace) { servlet.getMetrics().incrementRequests(1); + final long startTime = System.currentTimeMillis(); if (servlet.isReadOnly()) { servlet.getMetrics().incrementFailedPutRequests(1); return Response.status(Response.Status.FORBIDDEN) @@ -292,6 +300,7 @@ public class RowResource extends ResourceBase { if (LOG.isTraceEnabled()) { LOG.trace("PUT " + put.toString()); } + servlet.getMetrics().processedOperation(Operation.PUT, System.currentTimeMillis() - startTime); servlet.getMetrics().incrementSucessfulPutRequests(1); return Response.ok().build(); } catch (Exception e) { @@ -356,6 +365,8 @@ public class RowResource extends ResourceBase { LOG.trace("DELETE " + uriInfo.getAbsolutePath()); } servlet.getMetrics().incrementRequests(1); + + final long startTime = System.currentTimeMillis(); if (servlet.isReadOnly()) { servlet.getMetrics().incrementFailedDeleteRequests(1); return Response.status(Response.Status.FORBIDDEN) @@ -397,6 +408,7 @@ public class RowResource extends ResourceBase { table = servlet.getTable(tableResource.getName()); table.delete(delete); servlet.getMetrics().incrementSucessfulDeleteRequests(1); + servlet.getMetrics().processedOperation(Operation.DELETE, System.currentTimeMillis() - startTime); if (LOG.isTraceEnabled()) { LOG.trace("DELETE " + delete.toString()); } @@ -421,6 +433,7 @@ public class RowResource extends ResourceBase { * @return Response 200 OK, 304 Not modified, 400 Bad request */ Response checkAndPut(final CellSetModel model) { + final long startTime = System.currentTimeMillis(); Table table = null; try { table = servlet.getTable(tableResource.getName()); @@ -509,6 +522,7 @@ public class RowResource extends ResourceBase { .build(); } ResponseBuilder response = Response.ok(); + servlet.getMetrics().processedOperation(Operation.PUT, System.currentTimeMillis() - startTime); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); } catch (Exception e) { @@ -531,6 +545,7 @@ public class RowResource extends ResourceBase { * @return Response 200 OK, 304 Not modified, 400 Bad request */ Response checkAndDelete(final CellSetModel model) { + final long startTime = System.currentTimeMillis(); Table table = null; Delete delete = null; try { @@ -639,6 +654,7 @@ public class RowResource extends ResourceBase { .build(); } ResponseBuilder response = Response.ok(); + servlet.getMetrics().processedOperation(Operation.DELETE, System.currentTimeMillis() - startTime); servlet.getMetrics().incrementSucessfulDeleteRequests(1); return response.build(); } catch (Exception e) { diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java index 5cc2c7b..e6ae522 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.MetricsRESTSource.Operation; import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonProperty; @@ -75,14 +77,17 @@ public class TableScanResource extends ResourceBase { public Iterator iterator() { return new Iterator() { int count = rowsToSend; + final long startTime = System.currentTimeMillis(); @Override public boolean hasNext() { - if (count > 0) { - return itr.hasNext(); - } else { - return false; + boolean more = count > 0 ? itr.hasNext() : false; + if (more == false) { + // Track to the last record to conclude this scan session for latency metric, + // this might not work if the caller terminate the session before getting 'count' items + servlet.getMetrics().processedOperation(Operation.SCAN, System.currentTimeMillis() - startTime); } + return more; } @Override @@ -128,10 +133,17 @@ public class TableScanResource extends ResourceBase { @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime, @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) { servlet.getMetrics().incrementRequests(1); + final long start = System.currentTimeMillis(); try { int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10); ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType, - userRequestedLimit, fetchSize); + userRequestedLimit, fetchSize, new Callable() { + @Override + public Void call() throws Exception { + servlet.getMetrics().processedOperation(Operation.SCAN, System.currentTimeMillis() - start); + return null; + } + }); servlet.getMetrics().incrementSucessfulScanRequests(1); ResponseBuilder response = Response.ok(stream); response.header("content-type", contentType); -- 2.3.8 (Apple Git-58)