diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSource.java index 46db75c..a9989b5 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSource.java @@ -55,6 +55,12 @@ public interface MasterMetricsSource extends BaseMetricsSource { public static final String SERVER_NAME_NAME = "serverName"; public static final String CLUSTER_ID_NAME = "clusterId"; public static final String IS_ACTIVE_MASTER_NAME = "isActiveMaster"; + public static final String SPLIT_TIME_NAME = "hlogSplitTime"; + public static final String SPLIT_SIZE_NAME = "hlogSplitSize"; + public static final String CLUSTER_REQUESTS_NAME = "clusterRequests"; + public static final String RIT_COUNT_NAME = "ritCount"; + public static final String RIT_COUNT_OVER_THRESHOLD_NAME = "ritCountOverThreshold"; + public static final String RIT_OLDEST_AGE_NAME = "ritOldestAge"; public static final String MASTER_ACTIVE_TIME_DESC = "Master Active Time"; public static final String MASTER_START_TIME_DESC = "Master Start Time"; public static final String AVERAGE_LOAD_DESC = "AverageLoad"; @@ -64,6 +70,8 @@ public interface MasterMetricsSource extends BaseMetricsSource { public static final String SERVER_NAME_DESC = "Server Name"; public static final String CLUSTER_ID_DESC = "Cluster Id"; public static final String IS_ACTIVE_MASTER_DESC = "Is Active Master"; + public static final String SPLIT_TIME_DESC = "Time it takes to finish HLog.splitLog()"; + public static final String SPLIT_SIZE_DESC = "Size of HLog files being split"; /** @@ -90,4 +98,8 @@ public interface MasterMetricsSource extends BaseMetricsSource { */ public void setRITOldestAge(long age); + public void updateSplitTime(long time); + + public void updateSplitSize(long size); + } diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSource.java index 23130ea..84f4943 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSource.java @@ -63,6 +63,24 @@ public interface BaseMetricsSource { public void incCounters(String counterName, long delta); /** + * Add some value to a histogram. + * + * @param name the name of the histogram + * @param value the value to add to the histogram + */ + public void updateHistogram(String name, long value); + + + /** + * Add some value to a Quantile (An accurate histogram). + * + * @param name the name of the quantile + * @param value the value to add to the quantile + */ + public void updateQuantile(String name, long value); + + + /** * Remove a counter and stop announcing it to metrics2. * * @param key diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics/MetricHistogram.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics/MetricHistogram.java new file mode 100644 index 0000000..8ebe8dc --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics/MetricHistogram.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics; + +/** + * + */ +public interface MetricHistogram { + + public static final String NUM_OPS_METRIC_NAME = "_num_ops"; + public static final String MIN_METRIC_NAME = "_min"; + public static final String MAX_METRIC_NAME = "_max"; + public static final String MEAN_METRIC_NAME = "_mean"; + public static final String STD_DEV_METRIC_NAME = "_std_dev"; + public static final String MEDIAN_METRIC_NAME = "_median"; + public static final String SEVENTY_FIFTH_PERCENTILE_METRIC_NAME = "_75th_percentile"; + public static final String NINETY_FIFTH_PERCENTILE_METRIC_NAME = "_95th_percentile"; + public static final String NINETY_NINETH_PERCENTILE_METRIC_NAME = "_99th_percentile"; + + public void add(long value); + +} diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics/MetricsExecutor.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics/MetricsExecutor.java new file mode 100644 index 0000000..605b07d --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics/MetricsExecutor.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * + */ +public interface MetricsExecutor { + + public ScheduledExecutorService getExecutor(); + + public void stop(); + +} diff --git hbase-hadoop1-compat/pom.xml hbase-hadoop1-compat/pom.xml index 3fed079..8e79b5d 100644 --- hbase-hadoop1-compat/pom.xml +++ hbase-hadoop1-compat/pom.xml @@ -93,6 +93,10 @@ limitations under the License. + com.yammer.metrics + metrics-core + + org.apache.hadoop hadoop-test ${hadoop-one.version} diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java index 200e621..51dde66 100644 --- hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.metrics2.MetricsBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MetricMutableHistogram; /** * Hadoop1 implementation of MasterMetricsSource. @@ -40,6 +41,8 @@ public class MasterMetricsSourceImpl final MetricMutableGaugeLong ritOldestAgeGauge; private final MasterMetricsWrapper masterWrapper; + private MetricMutableHistogram splitTimeHisto; + private MetricMutableHistogram splitSizeHisto; public MasterMetricsSourceImpl(MasterMetricsWrapper masterWrapper) { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, masterWrapper); @@ -53,10 +56,13 @@ public class MasterMetricsSourceImpl super(metricsName, metricsDescription, metricsContext, metricsJmxContext); this.masterWrapper = masterWrapper; - clusterRequestsCounter = metricsRegistry.newCounter("cluster_requests", "", 0l); - ritGauge = metricsRegistry.newGauge("ritCount", "", 0l); - ritCountOverThresholdGauge = metricsRegistry.newGauge("ritCountOverThreshold","", 0l); - ritOldestAgeGauge = metricsRegistry.newGauge("ritOldestAge", "", 0l); + clusterRequestsCounter = metricsRegistry.newCounter(CLUSTER_REQUESTS_NAME, "", 0l); + ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l); + ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l); + ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l); + splitTimeHisto = metricsRegistry.newHistogram(SPLIT_SIZE_NAME, SPLIT_SIZE_DESC); + splitSizeHisto = metricsRegistry.newHistogram(SPLIT_TIME_NAME, SPLIT_TIME_DESC); + } public void incRequests(final int inc) { @@ -75,6 +81,16 @@ public class MasterMetricsSourceImpl ritCountOverThresholdGauge.set(ritCount); } + @Override + public void updateSplitTime(long time) { + splitTimeHisto.add(time); + } + + @Override + public void updateSplitSize(long size) { + splitSizeHisto.add(size); + } + /** * Method to export all the metrics. * diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java index 15f00a3..b78e196 100644 --- hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java @@ -27,6 +27,8 @@ import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricMutable; import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MetricMutableHistogram; +import org.apache.hadoop.metrics2.lib.MetricMutableQuantiles; import org.apache.hadoop.metrics2.source.JvmMetricsSource; import java.util.Map; @@ -123,6 +125,18 @@ public class BaseMetricsSourceImpl implements BaseMetricsSource, MetricsSource { } + @Override + public void updateHistogram(String name, long value) { + MetricMutableHistogram histo = metricsRegistry.getHistogram(name); + histo.add(value); + } + + @Override + public void updateQuantile(String name, long value) { + MetricMutableQuantiles histo = metricsRegistry.getQuantile(name); + histo.add(value); + } + /** * Remove a named gauge. * diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index b980454..f887178 100644 --- hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -178,6 +178,46 @@ public class DynamicMetricsRegistry { } /** + * Create a new histogram. + * @param name Name of the histogram. + * @return A new MutableHistogram + */ + public MetricMutableHistogram newHistogram(String name) { + return newHistogram(name, ""); + } + + /** + * Create a new histogram. + * @param name The name of the histogram + * @param desc The description of the data in the histogram. + * @return A new MutableHistogram + */ + public MetricMutableHistogram newHistogram(String name, String desc) { + MetricMutableHistogram histo = new MetricMutableHistogram(name, desc); + return addNewMetricIfAbsent(name, histo, MetricMutableHistogram.class); + } + + /** + * Create a new MutableQuantile(A more accurate histogram). + * @param name The name of the histogram + * @return a new MutableQuantile + */ + public MetricMutableQuantiles newQuantile(String name) { + return newQuantile(name, ""); + } + + /** + * Create a new MutableQuantile(A more accurate histogram). + * @param name The name of the histogram + * @param desc Description of the data. + * @return a new MutableQuantile + */ + public MetricMutableQuantiles newQuantile(String name, String desc) { + MetricMutableQuantiles histo = new MetricMutableQuantiles(name, desc); + return addNewMetricIfAbsent(name, histo, MetricMutableQuantiles.class); + } + + /** * Set the metrics context tag * @param name of the context * @return the registry itself as a convenience @@ -277,7 +317,7 @@ public class DynamicMetricsRegistry { if (metric == null) { //Create the potential new gauge. - MetricMutableGaugeLong newGauge = new MetricMutableGaugeLong(gaugeName, "", + MetricMutableGaugeLong newGauge = mf.newGauge(gaugeName, "", potentialStartingValue); // Try and put the gauge in. This is atomic. @@ -313,7 +353,7 @@ public class DynamicMetricsRegistry { MetricMutable counter = metricsMap.get(counterName); if (counter == null) { MetricMutableCounterLong newCounter = - new MetricMutableCounterLong(counterName, "", potentialStartingValue); + mf.newCounter(counterName, "", potentialStartingValue); counter = metricsMap.putIfAbsent(counterName, newCounter); if (counter == null) { return newCounter; @@ -328,6 +368,46 @@ public class DynamicMetricsRegistry { return (MetricMutableCounterLong) counter; } + public MetricMutableHistogram getHistogram(String histoName) { + //See getLongGauge for description on how this works. + MetricMutable histo = metricsMap.get(histoName); + if (histo == null) { + MetricMutableHistogram newHisto = + new MetricMutableHistogram(histoName, ""); + histo = metricsMap.putIfAbsent(histoName, newHisto); + if (histo == null) { + return newHisto; + } + } + + if (!(histo instanceof MetricMutableHistogram)) { + throw new MetricsException("Metric already exists in registry for metric name: " + + name + "and not of type MetricMutableHistogram"); + } + + return (MetricMutableHistogram) histo; + } + + public MetricMutableQuantiles getQuantile(String histoName) { + //See getLongGauge for description on how this works. + MetricMutable histo = metricsMap.get(histoName); + if (histo == null) { + MetricMutableQuantiles newHisto = + new MetricMutableQuantiles(histoName, ""); + histo = metricsMap.putIfAbsent(histoName, newHisto); + if (histo == null) { + return newHisto; + } + } + + if (!(histo instanceof MetricMutableQuantiles)) { + throw new MetricsException("Metric already exists in registry for metric name: " + + name + "and not of type MetricMutableQuantiles"); + } + + return (MetricMutableQuantiles) histo; + } + private T addNewMetricIfAbsent(String name, T ret, diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableHistogram.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableHistogram.java new file mode 100644 index 0000000..166af08 --- /dev/null +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableHistogram.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import com.yammer.metrics.stats.ExponentiallyDecayingSample; +import com.yammer.metrics.stats.Sample; +import com.yammer.metrics.stats.Snapshot; +import org.apache.hadoop.metrics.MetricHistogram; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.MetricMutable; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A histogram implementation that runs in constant space, and exports to hadoop's metrics2 system. + */ +public class MetricMutableHistogram extends MetricMutable implements MetricHistogram { + + private static final int DEFAULT_SAMPLE_SIZE = 2046; + // the bias towards sampling from more recent data. + // Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes + private static final double DEFAULT_ALPHA = 0.015; + + private final Sample sample; + private final AtomicLong min; + private final AtomicLong max; + private final AtomicLong sum; + private final AtomicLong count; + + + public MetricMutableHistogram(String name, String description) { + super(name, description); + sample = new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA); + count = new AtomicLong(); + min = new AtomicLong(Long.MAX_VALUE); + max = new AtomicLong(Long.MIN_VALUE); + sum = new AtomicLong(); + } + + public void add(final long val) { + setChanged(); + count.incrementAndGet(); + sample.update(val); + setMax(val); + setMin(val); + sum.getAndAdd(val); + } + + private void setMax(final long potentialMax) { + boolean done = false; + while (!done) { + final long currentMax = max.get(); + done = currentMax >= potentialMax + || max.compareAndSet(currentMax, potentialMax); + } + } + + private void setMin(long potentialMin) { + boolean done = false; + while (!done) { + final long currentMin = min.get(); + done = currentMin <= potentialMin + || min.compareAndSet(currentMin, potentialMin); + } + } + + public long getMax() { + if (count.get() > 0) { + return max.get(); + } + return 0L; + } + + public long getMin() { + if (count.get() > 0) { + return min.get(); + } + return 0L; + } + + public double getMean() { + long cCount = count.get(); + if (cCount > 0) { + return sum.get() / (double) cCount; + } + return 0.0; + } + + + @Override + public void snapshot(MetricsRecordBuilder metricsRecordBuilder, boolean all) { + if (all || changed()) { + clearChanged(); + final Snapshot s = sample.getSnapshot(); + metricsRecordBuilder.addCounter(name + NUM_OPS_METRIC_NAME, "", count.get()); + metricsRecordBuilder.addGauge(name + MIN_METRIC_NAME, "", getMin()); + metricsRecordBuilder.addGauge(name + MAX_METRIC_NAME, "", getMax()); + metricsRecordBuilder.addGauge(name + MEAN_METRIC_NAME, "", getMean()); + + metricsRecordBuilder.addGauge(name + MEDIAN_METRIC_NAME, "", s.getMedian()); + metricsRecordBuilder.addGauge(name + SEVENTY_FIFTH_PERCENTILE_METRIC_NAME, "", s.get75thPercentile()); + metricsRecordBuilder.addGauge(name + NINETY_FIFTH_PERCENTILE_METRIC_NAME, "", s.get95thPercentile()); + metricsRecordBuilder.addGauge(name + NINETY_NINETH_PERCENTILE_METRIC_NAME, "", s.get99thPercentile()); + } + } +} diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableQuantiles.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableQuantiles.java new file mode 100644 index 0000000..7f4b71b --- /dev/null +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableQuantiles.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics.MetricHistogram; +import org.apache.hadoop.metrics.MetricsExecutor; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.util.MetricQuantile; +import org.apache.hadoop.metrics2.util.MetricSampleQuantiles; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Watches a stream of long values, maintaining online estimates of specific quantiles with provably + * low error bounds. This is particularly useful for accurate high-percentile (e.g. 95th, 99th) + * latency metrics. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MetricMutableQuantiles extends MetricMutable implements MetricHistogram { + + static final MetricQuantile[] quantiles = {new MetricQuantile(0.50, 0.050), + new MetricQuantile(0.75, 0.025), new MetricQuantile(0.90, 0.010), + new MetricQuantile(0.95, 0.005), new MetricQuantile(0.99, 0.001)}; + + static final String[] quantilesSuffix = {"_Median", + "_75th_percentile", "_90th_percentile", + "_95th_percentile", "_99th_percentile"}; + + private final int interval; + + private MetricSampleQuantiles estimator; + private long previousCount = 0; + private MetricsExecutor executor; + + protected Map previousSnapshot = null; + + /** + * Instantiates a new {@link MetricMutableQuantiles} for a metric that rolls itself over on the + * specified time interval. + * + * @param name of the metric + * @param description long-form textual description of the metric + * @param sampleName type of items in the stream (e.g., "Ops") + * @param valueName type of the values + * @param interval rollover interval (in seconds) of the estimator + */ + public MetricMutableQuantiles(String name, String description, String sampleName, + String valueName, int interval) { + super(name, description); + + estimator = new MetricSampleQuantiles(quantiles); + + executor = new MetricsExecutorImpl(); + + this.interval = interval; + executor.getExecutor().scheduleAtFixedRate(new RolloverSample(this), + interval, + interval, + TimeUnit.SECONDS); + } + + public MetricMutableQuantiles(String name, String description) { + this(name, description, "Ops", "", 60); + } + + @Override + public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { + if (all || changed()) { + builder.addCounter(name + "NumOps", description, previousCount); + for (int i = 0; i < quantiles.length; i++) { + long newValue = 0; + // If snapshot is null, we failed to update since the window was empty + if (previousSnapshot != null) { + newValue = previousSnapshot.get(quantiles[i]); + } + builder.addGauge(name + quantilesSuffix[i], description, newValue); + } + if (changed()) { + clearChanged(); + } + } + } + + public synchronized void add(long value) { + estimator.insert(value); + } + + public int getInterval() { + return interval; + } + + /** Runnable used to periodically roll over the internal {@link org.apache.hadoop.metrics2.util.MetricSampleQuantiles} every interval. */ + private static class RolloverSample implements Runnable { + + MetricMutableQuantiles parent; + + public RolloverSample(MetricMutableQuantiles parent) { + this.parent = parent; + } + + @Override + public void run() { + synchronized (parent) { + try { + parent.previousCount = parent.estimator.getCount(); + parent.previousSnapshot = parent.estimator.snapshot(); + } catch (IOException e) { + // Couldn't get a new snapshot because the window was empty + parent.previousCount = 0; + parent.previousSnapshot = null; + } + parent.estimator.clear(); + } + parent.setChanged(); + } + + } +} \ No newline at end of file diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricsExecutorImpl.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricsExecutorImpl.java new file mode 100644 index 0000000..3135758 --- /dev/null +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricsExecutorImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import org.apache.hadoop.metrics.MetricsExecutor; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Class to handle the ScheduledExecutorService{@link ScheduledExecutorService} used by MetricMutableQuantiles{@link MetricMutableQuantiles} + */ +public class MetricsExecutorImpl implements MetricsExecutor { + + @Override + public ScheduledExecutorService getExecutor() { + return ExecutorSingleton.INSTANCE.scheduler; + } + + @Override + public void stop() { + if (!getExecutor().isShutdown()) { + getExecutor().shutdown(); + } + } + + private enum ExecutorSingleton { + INSTANCE; + + private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, new ThreadPoolExecutorThreadFactory("HBase-Metrics2-")); + } + + private static class ThreadPoolExecutorThreadFactory implements ThreadFactory { + private final String name; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private ThreadPoolExecutorThreadFactory(String name) { + this.name = name; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(runnable, name + threadNumber.getAndIncrement()); + t.setDaemon(true); + return t; + } + } +} diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricQuantile.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricQuantile.java new file mode 100644 index 0000000..0fb2295 --- /dev/null +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricQuantile.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Specifies a quantile (with error bounds) to be watched by a + * {@link MetricSampleQuantiles} object. + */ +@InterfaceAudience.Private +public class MetricQuantile { + public final double quantile; + public final double error; + + public MetricQuantile(double quantile, double error) { + this.quantile = quantile; + this.error = error; + } + + @Override + public boolean equals(Object aThat) { + if (this == aThat) { + return true; + } + if (!(aThat instanceof MetricQuantile)) { + return false; + } + + MetricQuantile that = (MetricQuantile) aThat; + + long qbits = Double.doubleToLongBits(quantile); + long ebits = Double.doubleToLongBits(error); + + return qbits == Double.doubleToLongBits(that.quantile) + && ebits == Double.doubleToLongBits(that.error); + } + + @Override + public int hashCode() { + return (int) (Double.doubleToLongBits(quantile) ^ Double + .doubleToLongBits(error)); + } +} \ No newline at end of file diff --git hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricSampleQuantiles.java hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricSampleQuantiles.java new file mode 100644 index 0000000..144a403 --- /dev/null +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricSampleQuantiles.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm + * for streaming calculation of targeted high-percentile epsilon-approximate + * quantiles. + * + * This is a generalization of the earlier work by Greenwald and Khanna (GK), + * which essentially allows different error bounds on the targeted quantiles, + * which allows for far more efficient calculation of high-percentiles. + * + * See: Cormode, Korn, Muthukrishnan, and Srivastava + * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005 + * + * Greenwald and Khanna, + * "Space-efficient online computation of quantile summaries" in SIGMOD 2001 + * + */ +@InterfaceAudience.Private +public class MetricSampleQuantiles { + + /** + * Total number of items in stream + */ + private long count = 0; + + /** + * Current list of sampled items, maintained in sorted order with error bounds + */ + private LinkedList samples; + + /** + * Buffers incoming items to be inserted in batch. Items are inserted into + * the buffer linearly. When the buffer fills, it is flushed into the samples + * array in its entirety. + */ + private long[] buffer = new long[500]; + private int bufferCount = 0; + + /** + * Array of Quantiles that we care about, along with desired error. + */ + private final MetricQuantile quantiles[]; + + public MetricSampleQuantiles(MetricQuantile[] quantiles) { + this.quantiles = quantiles; + this.samples = new LinkedList(); + } + + /** + * Specifies the allowable error for this rank, depending on which quantiles + * are being targeted. + * + * This is the f(r_i, n) function from the CKMS paper. It's basically how wide + * the range of this rank can be. + * + * @param rank + * the index in the list of samples + */ + private double allowableError(int rank) { + int size = samples.size(); + double minError = size + 1; + for (MetricQuantile q : quantiles) { + double error; + if (rank <= q.quantile * size) { + error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); + } else { + error = (2.0 * q.error * rank) / q.quantile; + } + if (error < minError) { + minError = error; + } + } + + return minError; + } + + /** + * Add a new value from the stream. + * + * @param v + */ + synchronized public void insert(long v) { + buffer[bufferCount] = v; + bufferCount++; + + count++; + + if (bufferCount == buffer.length) { + insertBatch(); + compress(); + } + } + + /** + * Merges items from buffer into the samples array in one pass. + * This is more efficient than doing an insert on every item. + */ + private void insertBatch() { + if (bufferCount == 0) { + return; + } + + Arrays.sort(buffer, 0, bufferCount); + + // Base case: no samples + int start = 0; + if (samples.size() == 0) { + SampleItem newItem = new SampleItem(buffer[0], 1, 0); + samples.add(newItem); + start++; + } + + ListIterator it = samples.listIterator(); + SampleItem item = it.next(); + for (int i = start; i < bufferCount; i++) { + long v = buffer[i]; + while (it.nextIndex() < samples.size() && item.value < v) { + item = it.next(); + } + // If we found that bigger item, back up so we insert ourselves before it + if (item.value > v) { + it.previous(); + } + // We use different indexes for the edge comparisons, because of the above + // if statement that adjusts the iterator + int delta; + if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { + delta = 0; + } else { + delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; + } + SampleItem newItem = new SampleItem(v, 1, delta); + it.add(newItem); + item = newItem; + } + + bufferCount = 0; + } + + /** + * Try to remove extraneous items from the set of sampled items. This checks + * if an item is unnecessary based on the desired error bounds, and merges it + * with the adjacent item if it is. + */ + private void compress() { + if (samples.size() < 2) { + return; + } + + ListIterator it = samples.listIterator(); + SampleItem prev = null; + SampleItem next = it.next(); + + while (it.hasNext()) { + prev = next; + next = it.next(); + if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { + next.g += prev.g; + // Remove prev. it.remove() kills the last thing returned. + it.previous(); + it.previous(); + it.remove(); + // it.next() is now equal to next, skip it back forward again + it.next(); + } + } + } + + /** + * Get the estimated value at the specified quantile. + * + * @param quantile Queried quantile, e.g. 0.50 or 0.99. + * @return Estimated value at that quantile. + */ + private long query(double quantile) throws IOException { + if (samples.size() == 0) { + throw new IOException("No samples present"); + } + + int rankMin = 0; + int desired = (int) (quantile * count); + + for (int i = 1; i < samples.size(); i++) { + SampleItem prev = samples.get(i - 1); + SampleItem cur = samples.get(i); + + rankMin += prev.g; + + if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { + return prev.value; + } + } + + // edge case of wanting max value + return samples.get(samples.size() - 1).value; + } + + /** + * Get a snapshot of the current values of all the tracked quantiles. + * + * @return snapshot of the tracked quantiles + * @throws IOException + * if no items have been added to the estimator + */ + synchronized public Map snapshot() throws IOException { + // flush the buffer first for best results + insertBatch(); + Map values = new HashMap(quantiles.length); + for (int i = 0; i < quantiles.length; i++) { + values.put(quantiles[i], query(quantiles[i].quantile)); + } + + return values; + } + + /** + * Returns the number of items that the estimator has processed + * + * @return count total number of items processed + */ + synchronized public long getCount() { + return count; + } + + /** + * Returns the number of samples kept by the estimator + * + * @return count current number of samples + */ + synchronized public int getSampleCount() { + return samples.size(); + } + + /** + * Resets the estimator, clearing out all previously inserted items + */ + synchronized public void clear() { + count = 0; + bufferCount = 0; + samples.clear(); + } + + /** + * Describes a measured value passed to the estimator, tracking additional + * metadata required by the CKMS algorithm. + */ + private static class SampleItem { + + /** + * Value of the sampled item (e.g. a measured latency value) + */ + public final long value; + + /** + * Difference between the lowest possible rank of the previous item, and + * the lowest possible rank of this item. + * + * The sum of the g of all previous items yields this item's lower bound. + */ + public int g; + + /** + * Difference between the item's greatest possible rank and lowest possible + * rank. + */ + public final int delta; + + public SampleItem(long value, int lowerDelta, int delta) { + this.value = value; + this.g = lowerDelta; + this.delta = delta; + } + + @Override + public String toString() { + return String.format("%d, %d, %d", value, g, delta); + } + } +} diff --git hbase-hadoop2-compat/pom.xml hbase-hadoop2-compat/pom.xml index 5a03e16..f68d1f6 100644 --- hbase-hadoop2-compat/pom.xml +++ hbase-hadoop2-compat/pom.xml @@ -128,6 +128,10 @@ limitations under the License. hadoop-annotations ${hadoop-two.version} + + com.yammer.metrics + metrics-core + diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java index 89b6ddb..0037c71 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetricsSourceImpl.java @@ -24,16 +24,20 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableHistogram; /** Hadoop2 implementation of MasterMetricsSource. */ public class MasterMetricsSourceImpl extends BaseMetricsSourceImpl implements MasterMetricsSource { + MutableCounterLong clusterRequestsCounter; MutableGaugeLong ritGauge; MutableGaugeLong ritCountOverThresholdGauge; MutableGaugeLong ritOldestAgeGauge; private final MasterMetricsWrapper masterWrapper; + private MutableHistogram splitTimeHisto; + private MutableHistogram splitSizeHisto; public MasterMetricsSourceImpl(MasterMetricsWrapper masterMetricsWrapper) { this(METRICS_NAME, @@ -51,10 +55,12 @@ public class MasterMetricsSourceImpl super(metricsName, metricsDescription, metricsContext, metricsJmxContext); this.masterWrapper = masterWrapper; - clusterRequestsCounter = metricsRegistry.newCounter("cluster_requests", "", 0l); - ritGauge = metricsRegistry.newGauge("ritCount", "", 0l); - ritCountOverThresholdGauge = metricsRegistry.newGauge("ritCountOverThreshold", "" , 0l); - ritOldestAgeGauge = metricsRegistry.newGauge("ritOldestAge", "", 0l); + clusterRequestsCounter = metricsRegistry.newCounter(CLUSTER_REQUESTS_NAME, "", 0l); + ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l); + ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l); + ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l); + splitTimeHisto = metricsRegistry.newHistogram(SPLIT_SIZE_NAME, SPLIT_SIZE_DESC); + splitSizeHisto = metricsRegistry.newHistogram(SPLIT_TIME_NAME, SPLIT_TIME_DESC); } public void incRequests(final int inc) { @@ -74,6 +80,16 @@ public class MasterMetricsSourceImpl } @Override + public void updateSplitTime(long time) { + splitTimeHisto.add(time); + } + + @Override + public void updateSplitSize(long size) { + splitSizeHisto.add(size); + } + + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { MetricsRecordBuilder metricsRecordBuilder = metricsCollector.addRecord(metricsName) diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java index 1e619e9..ad098a3 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseMetricsSourceImpl.java @@ -22,8 +22,10 @@ import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.apache.hadoop.metrics2.lib.MetricMutableQuantiles; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableHistogram; import org.apache.hadoop.metrics2.source.JvmMetrics; /** @@ -111,6 +113,18 @@ public class BaseMetricsSourceImpl implements BaseMetricsSource, MetricsSource { } + @Override + public void updateHistogram(String name, long value) { + MutableHistogram histo = metricsRegistry.getHistogram(name); + histo.add(value); + } + + @Override + public void updateQuantile(String name, long value) { + MetricMutableQuantiles histo = metricsRegistry.getQuantile(name); + histo.add(value); + } + /** * Remove a named gauge. * diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java index c55ef2d..5a2e3c5 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java @@ -250,8 +250,41 @@ public class DynamicMetricsRegistry { } } MutableRate ret = new MutableRate(name, desc, extended); - metricsMap.put(name, ret); - return ret; + return addNewMetricIfAbsent(name, ret, MutableRate.class); + } + + /** + * Create a new histogram. + * @param name Name of the histogram. + * @return A new MutableHistogram + */ + public MutableHistogram newHistogram(String name) { + return newHistogram(name, ""); + } + + /** + * Create a new histogram. + * @param name The name of the histogram + * @param desc The description of the data in the histogram. + * @return A new MutableHistogram + */ + public MutableHistogram newHistogram(String name, String desc) { + MutableHistogram histo = new MutableHistogram(name, desc); + return addNewMetricIfAbsent(name, histo, MutableHistogram.class); + } + + /** + * Create a new MutableQuantile(A more accurate histogram). + * @param name The name of the histogram + * @return a new MutableQuantile + */ + public MetricMutableQuantiles newQuantile(String name) { + return newQuantile(name, ""); + } + + public MetricMutableQuantiles newQuantile(String name, String desc) { + MetricMutableQuantiles histo = new MetricMutableQuantiles(name, desc, "Ops", "", 60); + return addNewMetricIfAbsent(name, histo, MetricMutableQuantiles.class); } synchronized void add(String name, MutableMetric metric) { @@ -440,6 +473,48 @@ public class DynamicMetricsRegistry { return (MutableCounterLong) counter; } + public MutableHistogram getHistogram(String histoName) { + //See getLongGauge for description on how this works. + MutableMetric histo = metricsMap.get(histoName); + if (histo == null) { + MutableHistogram newCounter = + new MutableHistogram(Interns.info(histoName, "")); + histo = metricsMap.putIfAbsent(histoName, newCounter); + if (histo == null) { + return newCounter; + } + } + + + if (!(histo instanceof MutableHistogram)) { + throw new MetricsException("Metric already exists in registry for metric name: " + + histoName + " and not of type MutableHistogram"); + } + + return (MutableHistogram) histo; + } + + public MetricMutableQuantiles getQuantile(String histoName) { + //See getLongGauge for description on how this works. + MutableMetric histo = metricsMap.get(histoName); + if (histo == null) { + MetricMutableQuantiles newCounter = + new MetricMutableQuantiles(histoName, "", "Ops", "", 60); + histo = metricsMap.putIfAbsent(histoName, newCounter); + if (histo == null) { + return newCounter; + } + } + + + if (!(histo instanceof MetricMutableQuantiles)) { + throw new MetricsException("Metric already exists in registry for metric name: " + + histoName + " and not of type MutableHistogram"); + } + + return (MetricMutableQuantiles) histo; + } + private T addNewMetricIfAbsent(String name, T ret, diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableQuantiles.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableQuantiles.java new file mode 100644 index 0000000..28e92c1 --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricMutableQuantiles.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics.MetricHistogram; +import org.apache.hadoop.metrics.MetricsExecutor; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.util.MetricQuantile; +import org.apache.hadoop.metrics2.util.MetricSampleQuantiles; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Watches a stream of long values, maintaining online estimates of specific quantiles with provably + * low error bounds. This is particularly useful for accurate high-percentile (e.g. 95th, 99th) + * latency metrics. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MetricMutableQuantiles extends MutableMetric implements MetricHistogram { + + static final MetricQuantile[] quantiles = {new MetricQuantile(0.50, 0.050), + new MetricQuantile(0.75, 0.025), new MetricQuantile(0.90, 0.010), + new MetricQuantile(0.95, 0.005), new MetricQuantile(0.99, 0.001)}; + + private final MetricsInfo numInfo; + private final MetricsInfo[] quantileInfos; + private final int interval; + + private MetricSampleQuantiles estimator; + private long previousCount = 0; + private MetricsExecutor executor; + + + @VisibleForTesting + protected Map previousSnapshot = null; + + /** + * Instantiates a new {@link MetricMutableQuantiles} for a metric that rolls itself over on the + * specified time interval. + * + * @param name of the metric + * @param description long-form textual description of the metric + * @param sampleName type of items in the stream (e.g., "Ops") + * @param valueName type of the values + * @param interval rollover interval (in seconds) of the estimator + */ + public MetricMutableQuantiles(String name, String description, String sampleName, + String valueName, int interval) { + String ucName = StringUtils.capitalize(name); + String usName = StringUtils.capitalize(sampleName); + String uvName = StringUtils.capitalize(valueName); + String desc = StringUtils.uncapitalize(description); + String lsName = StringUtils.uncapitalize(sampleName); + String lvName = StringUtils.uncapitalize(valueName); + + numInfo = info(ucName + "Num" + usName, String.format( + "Number of %s for %s with %ds interval", lsName, desc, interval)); + // Construct the MetricsInfos for the quantiles, converting to percentiles + quantileInfos = new MetricsInfo[quantiles.length]; + String nameTemplate = ucName + "%dthPercentile" + interval + "sInterval" + + uvName; + String descTemplate = "%d percentile " + lvName + " with " + interval + + " second interval for " + desc; + for (int i = 0; i < quantiles.length; i++) { + int percentile = (int) (100 * quantiles[i].quantile); + quantileInfos[i] = info(String.format(nameTemplate, percentile), + String.format(descTemplate, percentile)); + } + + estimator = new MetricSampleQuantiles(quantiles); + executor = new MetricsExecutorImpl(); + this.interval = interval; + executor.getExecutor().scheduleAtFixedRate(new RolloverSample(this), + interval, + interval, + TimeUnit.SECONDS); + } + + @Override + public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { + if (all || changed()) { + builder.addGauge(numInfo, previousCount); + for (int i = 0; i < quantiles.length; i++) { + long newValue = 0; + // If snapshot is null, we failed to update since the window was empty + if (previousSnapshot != null) { + newValue = previousSnapshot.get(quantiles[i]); + } + builder.addGauge(quantileInfos[i], newValue); + } + if (changed()) { + clearChanged(); + } + } + } + + public synchronized void add(long value) { + estimator.insert(value); + } + + public int getInterval() { + return interval; + } + + /** Runnable used to periodically roll over the internal {@link org.apache.hadoop.metrics2.util.MetricSampleQuantiles} every interval. */ + private static class RolloverSample implements Runnable { + + MetricMutableQuantiles parent; + + public RolloverSample(MetricMutableQuantiles parent) { + this.parent = parent; + } + + @Override + public void run() { + synchronized (parent) { + try { + parent.previousCount = parent.estimator.getCount(); + parent.previousSnapshot = parent.estimator.snapshot(); + } catch (IOException e) { + // Couldn't get a new snapshot because the window was empty + parent.previousCount = 0; + parent.previousSnapshot = null; + } + parent.estimator.clear(); + } + parent.setChanged(); + } + + } +} diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricsExecutorImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricsExecutorImpl.java new file mode 100644 index 0000000..3135758 --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MetricsExecutorImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import org.apache.hadoop.metrics.MetricsExecutor; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Class to handle the ScheduledExecutorService{@link ScheduledExecutorService} used by MetricMutableQuantiles{@link MetricMutableQuantiles} + */ +public class MetricsExecutorImpl implements MetricsExecutor { + + @Override + public ScheduledExecutorService getExecutor() { + return ExecutorSingleton.INSTANCE.scheduler; + } + + @Override + public void stop() { + if (!getExecutor().isShutdown()) { + getExecutor().shutdown(); + } + } + + private enum ExecutorSingleton { + INSTANCE; + + private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, new ThreadPoolExecutorThreadFactory("HBase-Metrics2-")); + } + + private static class ThreadPoolExecutorThreadFactory implements ThreadFactory { + private final String name; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private ThreadPoolExecutorThreadFactory(String name) { + this.name = name; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(runnable, name + threadNumber.getAndIncrement()); + t.setDaemon(true); + return t; + } + } +} diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java new file mode 100644 index 0000000..4fb0be9 --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import com.yammer.metrics.stats.ExponentiallyDecayingSample; +import com.yammer.metrics.stats.Sample; +import com.yammer.metrics.stats.Snapshot; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.metrics.MetricHistogram; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A histogram implementation that runs in constant space, and exports to hadoop2's metrics2 system. + */ +public class MutableHistogram extends MutableMetric implements MetricHistogram { + + private static final int DEFAULT_SAMPLE_SIZE = 2046; + // the bias towards sampling from more recent data. + // Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes + private static final double DEFAULT_ALPHA = 0.015; + + private final String name; + private final String desc; + private final Sample sample; + private final AtomicLong min; + private final AtomicLong max; + private final AtomicLong sum; + private final AtomicLong count; + + public MutableHistogram(MetricsInfo info) { + this(info.name(), info.description()); + } + + public MutableHistogram(String name, String description) { + this.name = StringUtils.capitalize(name); + this.desc = StringUtils.uncapitalize(description); + sample = new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA); + count = new AtomicLong(); + min = new AtomicLong(Long.MAX_VALUE); + max = new AtomicLong(Long.MIN_VALUE); + sum = new AtomicLong(); + } + + public void add(final long val) { + setChanged(); + count.incrementAndGet(); + sample.update(val); + setMax(val); + setMin(val); + sum.getAndAdd(val); + } + + private void setMax(final long potentialMax) { + boolean done = false; + while (!done) { + final long currentMax = max.get(); + done = currentMax >= potentialMax + || max.compareAndSet(currentMax, potentialMax); + } + } + + private void setMin(long potentialMin) { + boolean done = false; + while (!done) { + final long currentMin = min.get(); + done = currentMin <= potentialMin + || min.compareAndSet(currentMin, potentialMin); + } + } + + public long getMax() { + if (count.get() > 0) { + return max.get(); + } + return 0L; + } + + public long getMin() { + if (count.get() > 0) { + return min.get(); + } + return 0L; + } + + public double getMean() { + long cCount = count.get(); + if (cCount > 0) { + return sum.get() / (double) cCount; + } + return 0.0; + } + + @Override + public void snapshot(MetricsRecordBuilder metricsRecordBuilder, boolean all) { + if (all || changed()) { + clearChanged(); + final Snapshot s = sample.getSnapshot(); + metricsRecordBuilder.addCounter(Interns.info(name + NUM_OPS_METRIC_NAME, desc), count.get()); + + metricsRecordBuilder.addGauge(Interns.info(name + MIN_METRIC_NAME, desc), getMin()); + metricsRecordBuilder.addGauge(Interns.info(name + MAX_METRIC_NAME, desc), getMax()); + metricsRecordBuilder.addGauge(Interns.info(name + MEAN_METRIC_NAME, desc), getMean()); + + metricsRecordBuilder.addGauge(Interns.info(name + MEDIAN_METRIC_NAME, desc), s.getMedian()); + metricsRecordBuilder.addGauge(Interns.info(name + SEVENTY_FIFTH_PERCENTILE_METRIC_NAME, desc), + s.get75thPercentile()); + metricsRecordBuilder.addGauge(Interns.info(name + NINETY_FIFTH_PERCENTILE_METRIC_NAME, desc), + s.get95thPercentile()); + metricsRecordBuilder.addGauge(Interns.info(name + NINETY_NINETH_PERCENTILE_METRIC_NAME, desc), + s.get99thPercentile()); + } + } +} diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricQuantile.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricQuantile.java new file mode 100644 index 0000000..0fb2295 --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricQuantile.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Specifies a quantile (with error bounds) to be watched by a + * {@link MetricSampleQuantiles} object. + */ +@InterfaceAudience.Private +public class MetricQuantile { + public final double quantile; + public final double error; + + public MetricQuantile(double quantile, double error) { + this.quantile = quantile; + this.error = error; + } + + @Override + public boolean equals(Object aThat) { + if (this == aThat) { + return true; + } + if (!(aThat instanceof MetricQuantile)) { + return false; + } + + MetricQuantile that = (MetricQuantile) aThat; + + long qbits = Double.doubleToLongBits(quantile); + long ebits = Double.doubleToLongBits(error); + + return qbits == Double.doubleToLongBits(that.quantile) + && ebits == Double.doubleToLongBits(that.error); + } + + @Override + public int hashCode() { + return (int) (Double.doubleToLongBits(quantile) ^ Double + .doubleToLongBits(error)); + } +} \ No newline at end of file diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricSampleQuantiles.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricSampleQuantiles.java new file mode 100644 index 0000000..5710ebd --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/util/MetricSampleQuantiles.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm + * for streaming calculation of targeted high-percentile epsilon-approximate + * quantiles. + * + * This is a generalization of the earlier work by Greenwald and Khanna (GK), + * which essentially allows different error bounds on the targeted quantiles, + * which allows for far more efficient calculation of high-percentiles. + * + * See: Cormode, Korn, Muthukrishnan, and Srivastava + * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005 + * + * Greenwald and Khanna, + * "Space-efficient online computation of quantile summaries" in SIGMOD 2001 + * + */ +@InterfaceAudience.Private +public class MetricSampleQuantiles { + + /** + * Total number of items in stream + */ + private long count = 0; + + /** + * Current list of sampled items, maintained in sorted order with error bounds + */ + private LinkedList samples; + + /** + * Buffers incoming items to be inserted in batch. Items are inserted into + * the buffer linearly. When the buffer fills, it is flushed into the samples + * array in its entirety. + */ + private long[] buffer = new long[500]; + private int bufferCount = 0; + + /** + * Array of Quantiles that we care about, along with desired error. + */ + private final MetricQuantile quantiles[]; + + public MetricSampleQuantiles(MetricQuantile[] quantiles) { + this.quantiles = quantiles; + this.samples = new LinkedList(); + } + + /** + * Specifies the allowable error for this rank, depending on which quantiles + * are being targeted. + * + * This is the f(r_i, n) function from the CKMS paper. It's basically how wide + * the range of this rank can be. + * + * @param rank + * the index in the list of samples + */ + private double allowableError(int rank) { + int size = samples.size(); + double minError = size + 1; + for (MetricQuantile q : quantiles) { + double error; + if (rank <= q.quantile * size) { + error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); + } else { + error = (2.0 * q.error * rank) / q.quantile; + } + if (error < minError) { + minError = error; + } + } + + return minError; + } + + /** + * Add a new value from the stream. + * + * @param v + */ + synchronized public void insert(long v) { + buffer[bufferCount] = v; + bufferCount++; + + count++; + + if (bufferCount == buffer.length) { + insertBatch(); + compress(); + } + } + + /** + * Merges items from buffer into the samples array in one pass. + * This is more efficient than doing an insert on every item. + */ + private void insertBatch() { + if (bufferCount == 0) { + return; + } + + Arrays.sort(buffer, 0, bufferCount); + + // Base case: no samples + int start = 0; + if (samples.size() == 0) { + SampleItem newItem = new SampleItem(buffer[0], 1, 0); + samples.add(newItem); + start++; + } + + ListIterator it = samples.listIterator(); + SampleItem item = it.next(); + for (int i = start; i < bufferCount; i++) { + long v = buffer[i]; + while (it.nextIndex() < samples.size() && item.value < v) { + item = it.next(); + } + // If we found that bigger item, back up so we insert ourselves before it + if (item.value > v) { + it.previous(); + } + // We use different indexes for the edge comparisons, because of the above + // if statement that adjusts the iterator + int delta; + if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { + delta = 0; + } else { + delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; + } + SampleItem newItem = new SampleItem(v, 1, delta); + it.add(newItem); + item = newItem; + } + + bufferCount = 0; + } + + /** + * Try to remove extraneous items from the set of sampled items. This checks + * if an item is unnecessary based on the desired error bounds, and merges it + * with the adjacent item if it is. + */ + private void compress() { + if (samples.size() < 2) { + return; + } + + ListIterator it = samples.listIterator(); + SampleItem prev = null; + SampleItem next = it.next(); + + while (it.hasNext()) { + prev = next; + next = it.next(); + if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { + next.g += prev.g; + // Remove prev. it.remove() kills the last thing returned. + it.previous(); + it.previous(); + it.remove(); + // it.next() is now equal to next, skip it back forward again + it.next(); + } + } + } + + /** + * Get the estimated value at the specified quantile. + * + * @param quantile Queried quantile, e.g. 0.50 or 0.99. + * @return Estimated value at that quantile. + */ + private long query(double quantile) throws IOException { + if (samples.size() == 0) { + throw new IOException("No samples present"); + } + + int rankMin = 0; + int desired = (int) (quantile * count); + + for (int i = 1; i < samples.size(); i++) { + SampleItem prev = samples.get(i - 1); + SampleItem cur = samples.get(i); + + rankMin += prev.g; + + if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { + return prev.value; + } + } + + // edge case of wanting max value + return samples.get(samples.size() - 1).value; + } + + /** + * Get a snapshot of the current values of all the tracked quantiles. + * + * @return snapshot of the tracked quantiles + * @throws IOException + * if no items have been added to the estimator + */ + synchronized public Map snapshot() throws IOException { + // flush the buffer first for best results + insertBatch(); + Map values = new HashMap(quantiles.length); + for (int i = 0; i < quantiles.length; i++) { + values.put(quantiles[i], query(quantiles[i].quantile)); + } + + return values; + } + + /** + * Returns the number of items that the estimator has processed + * + * @return count total number of items processed + */ + synchronized public long getCount() { + return count; + } + + /** + * Returns the number of samples kept by the estimator + * + * @return count current number of samples + */ + @VisibleForTesting + synchronized public int getSampleCount() { + return samples.size(); + } + + /** + * Resets the estimator, clearing out all previously inserted items + */ + synchronized public void clear() { + count = 0; + bufferCount = 0; + samples.clear(); + } + + /** + * Describes a measured value passed to the estimator, tracking additional + * metadata required by the CKMS algorithm. + */ + private static class SampleItem { + + /** + * Value of the sampled item (e.g. a measured latency value) + */ + public final long value; + + /** + * Difference between the lowest possible rank of the previous item, and + * the lowest possible rank of this item. + * + * The sum of the g of all previous items yields this item's lower bound. + */ + public int g; + + /** + * Difference between the item's greatest possible rank and lowest possible + * rank. + */ + public final int delta; + + public SampleItem(long value, int lowerDelta, int delta) { + this.value = value; + this.g = lowerDelta; + this.delta = delta; + } + + @Override + public String toString() { + return String.format("%d, %d, %d", value, g, delta); + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetrics.java index 9f0f283..603d3e9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetrics.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetrics.java @@ -51,8 +51,8 @@ public class MasterMetrics { * @param size length of original HLogs that were split */ public synchronized void addSplit(long time, long size) { - //TODO use new metrics histogram - + masterMetricsSource.updateSplitTime(time); + masterMetricsSource.updateSplitSize(size); } /**