diff --git hbase-hadoop-compat/pom.xml hbase-hadoop-compat/pom.xml
index 016dd24..fe56064 100644
--- hbase-hadoop-compat/pom.xml
+++ hbase-hadoop-compat/pom.xml
@@ -110,6 +110,10 @@
org.apache.hbase
hbase-annotations
+
+
+ org.apache.hbase
+ hbase-annotations
test-jar
test
diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
index bb44946..d3a0db3 100644
--- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
+++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
+
/**
* Interface of a factory to create Metrics Sources used inside of regionservers.
*/
@@ -38,4 +39,16 @@ public interface MetricsRegionServerSourceFactory {
* @return A metrics region source
*/
MetricsRegionSource createRegion(MetricsRegionWrapper wrapper);
+
+ /**
+ * Create a MetricsUserSource from a user
+ * @return A metrics user source
+ */
+ MetricsUserSource createUser(String shortUserName);
+
+ /**
+ * Return the singleton instance for MetricsUserAggregateSource
+ * @return A metrics user aggregate source
+ */
+ MetricsUserAggregateSource getUserAggregate();
}
diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
new file mode 100644
index 0000000..86131f8
--- /dev/null
+++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/**
+* This interface will be implemented by a MetricsSource that will export metrics from
+* multiple users into the hadoop metrics system.
+*/
+@InterfaceAudience.Private
+public interface MetricsUserAggregateSource extends BaseSource {
+
+ /**
+ * The name of the metrics
+ */
+ static final String METRICS_NAME = "Users";
+
+ /**
+ * The name of the metrics context that metrics will be under.
+ */
+ static final String METRICS_CONTEXT = "regionserver";
+
+ /**
+ * Description
+ */
+ static final String METRICS_DESCRIPTION = "Metrics about users connected to the regionserver";
+
+ /**
+ * The name of the metrics context that metrics will be under in jmx
+ */
+ static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+ static final String NUM_USERS = "numUsers";
+ static final String NUMBER_OF_USERS_DESC = "Number of users in the metrics system";
+
+ /**
+ * Returns a MetricsUserSource if already exists, or creates and registers one for this user
+ * @param user the user name
+ * @return a metrics user source
+ */
+ MetricsUserSource getOrCreateMetricsUser(String user);
+}
diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
new file mode 100644
index 0000000..249327c
--- /dev/null
+++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsUserSource extends Comparable {
+
+ String getUser();
+
+ void register();
+
+ void deregister();
+
+ void updatePut(long t);
+
+ void updateDelete(long t);
+
+ void updateGet(long t);
+
+ void updateIncrement(long t);
+
+ void updateAppend(long t);
+
+ void updateReplay(long t);
+
+ void updateScanTime(long t);
+}
diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
index c483083..424e501 100644
--- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
+++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
@@ -28,26 +28,41 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public static enum FactoryStorage {
INSTANCE;
private Object aggLock = new Object();
- private MetricsRegionAggregateSourceImpl aggImpl;
+ private MetricsRegionAggregateSourceImpl regionAggImpl;
+ private MetricsUserAggregateSourceImpl userAggImpl;
}
- private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
+ private synchronized MetricsRegionAggregateSourceImpl getRegionAggregate() {
synchronized (FactoryStorage.INSTANCE.aggLock) {
- if (FactoryStorage.INSTANCE.aggImpl == null) {
- FactoryStorage.INSTANCE.aggImpl = new MetricsRegionAggregateSourceImpl();
+ if (FactoryStorage.INSTANCE.regionAggImpl == null) {
+ FactoryStorage.INSTANCE.regionAggImpl = new MetricsRegionAggregateSourceImpl();
}
- return FactoryStorage.INSTANCE.aggImpl;
+ return FactoryStorage.INSTANCE.regionAggImpl;
}
}
+ public synchronized MetricsUserAggregateSourceImpl getUserAggregate() {
+ synchronized (FactoryStorage.INSTANCE.aggLock) {
+ if (FactoryStorage.INSTANCE.userAggImpl == null) {
+ FactoryStorage.INSTANCE.userAggImpl = new MetricsUserAggregateSourceImpl();
+ }
+ return FactoryStorage.INSTANCE.userAggImpl;
+ }
+ }
@Override
- public synchronized MetricsRegionServerSource createServer(MetricsRegionServerWrapper regionServerWrapper) {
+ public synchronized MetricsRegionServerSource createServer(
+ MetricsRegionServerWrapper regionServerWrapper) {
return new MetricsRegionServerSourceImpl(regionServerWrapper);
}
@Override
public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) {
- return new MetricsRegionSourceImpl(wrapper, getAggregate());
+ return new MetricsRegionSourceImpl(wrapper, getRegionAggregate());
+ }
+
+ @Override
+ public MetricsUserSource createUser(String shortUserName) {
+ return new MetricsUserSourceImpl(shortUserName, getUserAggregate());
}
}
diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
new file mode 100644
index 0000000..471312a
--- /dev/null
+++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.Interns;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
+ implements MetricsUserAggregateSource {
+
+ private static final Log LOG = LogFactory.getLog(MetricsUserAggregateSourceImpl.class);
+
+ private final ConcurrentHashMap userSources =
+ new ConcurrentHashMap();
+
+ public MetricsUserAggregateSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsUserAggregateSourceImpl(String metricsName,
+ String metricsDescription,
+ String metricsContext,
+ String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+ }
+
+ @Override
+ public MetricsUserSource getOrCreateMetricsUser(String user) {
+ MetricsUserSource source = userSources.get(user);
+ if (source != null) {
+ return source;
+ }
+ source = new MetricsUserSourceImpl(user, this);
+ MetricsUserSource prev = userSources.putIfAbsent(user, source);
+
+ if (prev != null) {
+ return prev;
+ } else {
+ // register the new metrics now
+ register(source);
+ }
+ return source;
+ }
+
+ public void register(MetricsUserSource source) {
+ synchronized (this) {
+ source.register();
+ }
+ }
+
+ public void deregister(MetricsUserSource toRemove) {
+ /*
+ * We do not deregister per-user level metrics for now, so this function is not used. The user
+ * can go away and come back hours or days later, but we still would like to keep context for
+ * the user to calculate the counters and rates. Users are usually static in a cluster and
+ * there are not thousands of them, so it should be fine to not do the cleanup.
+ *
+ * Since we are not removing metrics, JmxCacheBuster is also not called periodically.
+ */
+ try {
+ synchronized (this) {
+ MetricsUserSource source = userSources.remove(toRemove);
+ if (source != null) {
+ source.deregister();
+ }
+ }
+ } catch (Exception e) {
+ // Ignored. If this errors out it means that someone is double
+ // closing the user source and the user metrics is already nulled out.
+ LOG.info( "Error trying to remove " + toRemove + " from " + getClass().getSimpleName(), e);
+ }
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder mrb = collector.addRecord(metricsName);
+
+ if (userSources != null) {
+ for (MetricsUserSource userMetricSource : userSources.values()) {
+ if (userMetricSource instanceof MetricsUserSourceImpl) {
+ ((MetricsUserSourceImpl) userMetricSource).snapshot(mrb, all);
+ }
+ }
+ mrb.addGauge(Interns.info(NUM_USERS, NUMBER_OF_USERS_DESC), userSources.size());
+ metricsRegistry.snapshot(mrb, all);
+ }
+ }
+}
diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
new file mode 100644
index 0000000..2237358
--- /dev/null
+++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+
+@InterfaceAudience.Private
+public class MetricsUserSourceImpl implements MetricsUserSource {
+ private static final Log LOG = LogFactory.getLog(MetricsUserSourceImpl.class);
+
+ private final String userNamePrefix;
+
+ private final String user;
+
+ private final String userGetKey;
+ private final String userScanTimeKey;
+ private final String userPutKey;
+ private final String userDeleteKey;
+ private final String userIncrementKey;
+ private final String userAppendKey;
+ private final String userReplayKey;
+
+ private MetricHistogram getHisto;
+ private MetricHistogram scanTimeHisto;
+ private MetricHistogram putHisto;
+ private MetricHistogram deleteHisto;
+ private MetricHistogram incrementHisto;
+ private MetricHistogram appendHisto;
+ private MetricHistogram replayHisto;
+
+ private final int hashCode;
+
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private final MetricsUserAggregateSourceImpl agg;
+ private final DynamicMetricsRegistry registry;
+
+ public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
+ }
+
+ this.user = user;
+ this.agg = agg;
+ this.registry = agg.getMetricsRegistry();
+
+ this.userNamePrefix = "user_" + user + "_metric_";
+
+ hashCode = userNamePrefix.hashCode();
+
+ userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY;
+ userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY;
+ userPutKey = userNamePrefix + MetricsRegionServerSource.MUTATE_KEY;
+ userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY;
+ userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
+ userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
+ userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
+
+ agg.register(this);
+ }
+
+ @Override
+ public void register() {
+ synchronized (this) {
+ getHisto = registry.newTimeHistogram(userGetKey);
+ scanTimeHisto = registry.newTimeHistogram(userScanTimeKey);
+ putHisto = registry.newTimeHistogram(userPutKey);
+ deleteHisto = registry.newTimeHistogram(userDeleteKey);
+ incrementHisto = registry.newTimeHistogram(userIncrementKey);
+ appendHisto = registry.newTimeHistogram(userAppendKey);
+ replayHisto = registry.newTimeHistogram(userReplayKey);
+ }
+ }
+
+ @Override
+ public void deregister() {
+ boolean wasClosed = closed.getAndSet(true);
+
+ // Has someone else already closed this for us?
+ if (wasClosed) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing user Metrics for user: " + user);
+ }
+
+ synchronized (this) {
+ registry.removeHistogramMetrics(userGetKey);
+ registry.removeHistogramMetrics(userScanTimeKey);
+ registry.removeHistogramMetrics(userPutKey);
+ registry.removeHistogramMetrics(userDeleteKey);
+ registry.removeHistogramMetrics(userIncrementKey);
+ registry.removeHistogramMetrics(userAppendKey);
+ registry.removeHistogramMetrics(userReplayKey);
+ }
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public int compareTo(MetricsUserSource source) {
+ if (source == null) {
+ return -1;
+ }
+ if (!(source instanceof MetricsUserSourceImpl)) {
+ return -1;
+ }
+
+ MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source;
+
+ return Long.compare(hashCode, impl.hashCode);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0);
+ }
+
+ void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
+ // If there is a close that started be double extra sure
+ // that we're not getting any locks and not putting data
+ // into the metrics that should be removed. So early out
+ // before even getting the lock.
+ if (closed.get()) {
+ return;
+ }
+
+ // Grab the read
+ // This ensures that removes of the metrics
+ // can't happen while we are putting them back in.
+ synchronized (this) {
+
+ // It's possible that a close happened between checking
+ // the closed variable and getting the lock.
+ if (closed.get()) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void updatePut(long t) {
+ putHisto.add(t);
+ }
+
+ @Override
+ public void updateDelete(long t) {
+ deleteHisto.add(t);
+ }
+
+ @Override
+ public void updateGet(long t) {
+ getHisto.add(t);
+ }
+
+ @Override
+ public void updateIncrement(long t) {
+ incrementHisto.add(t);
+ }
+
+ @Override
+ public void updateAppend(long t) {
+ appendHisto.add(t);
+ }
+
+ @Override
+ public void updateReplay(long t) {
+ replayHisto.add(t);
+ }
+
+ @Override
+ public void updateScanTime(long t) {
+ scanTimeHisto.add(t);
+ }
+}
diff --git hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java
new file mode 100644
index 0000000..3343137
--- /dev/null
+++ hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.testclassification.MetricsTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MetricsTests.class, SmallTests.class})
+public class TestMetricsUserSourceImpl {
+
+ @Test
+ public void testCompareToHashCodeEquals() throws Exception {
+ MetricsRegionServerSourceFactory fact
+ = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
+
+ MetricsUserSource one = fact.createUser("ONE");
+ MetricsUserSource oneClone = fact.createUser("ONE");
+ MetricsUserSource two = fact.createUser("TWO");
+
+ assertEquals(0, one.compareTo(oneClone));
+ assertEquals(one.hashCode(), oneClone.hashCode());
+ assertNotEquals(one, two);
+
+ assertTrue( one.compareTo(two) != 0);
+ assertTrue( two.compareTo(one) != 0);
+ assertTrue( two.compareTo(one) != one.compareTo(two));
+ assertTrue( two.compareTo(two) == 0);
+ }
+
+
+ @Test (expected = RuntimeException.class)
+ public void testNoGetRegionServerMetricsSourceImpl() throws Exception {
+ // This should throw an exception because MetricsUserSourceImpl should only
+ // be created by a factory.
+ CompatibilitySingletonFactory.getInstance(MetricsUserSource.class);
+ }
+
+ @Test
+ public void testGetUser() {
+ MetricsRegionServerSourceFactory fact
+ = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
+
+ MetricsUserSource one = fact.createUser("ONE");
+ assertEquals("ONE", one.getUser());
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 1476190..e99c241 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1405,7 +1405,8 @@ public class HRegionServer extends HasThread implements
this.cacheConfig = new CacheConfig(conf);
this.walFactory = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
- this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
+ this.metricsRegionServer = new MetricsRegionServer(getConfiguration(),
+ new MetricsRegionServerWrapperImpl(this));
startServiceThreads();
startHeapMemoryManager();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index 8bca6c5..9bf04d8 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
/**
@@ -33,20 +34,23 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@InterfaceStability.Evolving
@InterfaceAudience.Private
public class MetricsRegionServer {
- private MetricsRegionServerSource serverSource;
- private MetricsRegionServerWrapper regionServerWrapper;
+ private final MetricsRegionServerSource serverSource;
+ private final MetricsRegionServerWrapper regionServerWrapper;
- public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper) {
- this(regionServerWrapper,
+ private final MetricsUserAggregate userAggregate;
+
+ public MetricsRegionServer(Configuration conf, MetricsRegionServerWrapper regionServerWrapper) {
+ this(conf, regionServerWrapper,
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createServer(regionServerWrapper));
}
- MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
+ MetricsRegionServer(Configuration conf, MetricsRegionServerWrapper regionServerWrapper,
MetricsRegionServerSource serverSource) {
this.regionServerWrapper = regionServerWrapper;
this.serverSource = serverSource;
+ this.userAggregate = new MetricsUserAggregate(conf);
}
@VisibleForTesting
@@ -54,6 +58,11 @@ public class MetricsRegionServer {
return serverSource;
}
+ @VisibleForTesting
+ public MetricsUserAggregate getMetricsUserAggregate() {
+ return userAggregate;
+ }
+
public MetricsRegionServerWrapper getRegionServerWrapper() {
return regionServerWrapper;
}
@@ -63,6 +72,7 @@ public class MetricsRegionServer {
serverSource.incrSlowPut();
}
serverSource.updatePut(t);
+ userAggregate.updatePut(t);
}
public void updateDelete(long t) {
@@ -70,6 +80,7 @@ public class MetricsRegionServer {
serverSource.incrSlowDelete();
}
serverSource.updateDelete(t);
+ userAggregate.updateDelete(t);
}
public void updateGet(long t) {
@@ -77,6 +88,7 @@ public class MetricsRegionServer {
serverSource.incrSlowGet();
}
serverSource.updateGet(t);
+ userAggregate.updateGet(t);
}
public void updateIncrement(long t) {
@@ -84,6 +96,7 @@ public class MetricsRegionServer {
serverSource.incrSlowIncrement();
}
serverSource.updateIncrement(t);
+ userAggregate.updateIncrement(t);
}
public void updateAppend(long t) {
@@ -91,10 +104,12 @@ public class MetricsRegionServer {
serverSource.incrSlowAppend();
}
serverSource.updateAppend(t);
+ userAggregate.updateAppend(t);
}
public void updateReplay(long t){
serverSource.updateReplay(t);
+ userAggregate.updateReplay(t);
}
public void updateScanSize(long scanSize){
@@ -103,6 +118,7 @@ public class MetricsRegionServer {
public void updateScanTime(long t) {
serverSource.updateScanTime(t);
+ userAggregate.updateScanTime(t);
}
public void updateSplitTime(long t) {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
new file mode 100644
index 0000000..15c1d8c
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import com.google.common.annotations.VisibleForTesting;
+
+public class MetricsUserAggregate {
+
+ /** Provider for mapping principal names to Users */
+ private final UserProvider userProvider;
+
+ private final MetricsUserAggregateSource source;
+
+ public MetricsUserAggregate(Configuration conf) {
+ source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+ .getUserAggregate();
+
+ this.userProvider = UserProvider.instantiate(conf);
+ }
+
+ /**
+ * Returns the active user to which authorization checks should be applied.
+ * If we are in the context of an RPC call, the remote user is used,
+ * otherwise the currently logged in user is used.
+ */
+ private String getActiveUser() {
+ User user = RpcServer.getRequestUser();
+ if (user == null) {
+ // for non-rpc handling, fallback to system user
+ try {
+ user = userProvider.getCurrent();
+ } catch (IOException ignore) {
+ }
+ }
+ return user != null ? user.getShortName() : null;
+ }
+
+ @VisibleForTesting
+ MetricsUserAggregateSource getSource() {
+ return source;
+ }
+
+ public void updatePut(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updatePut(t);
+ }
+ }
+
+ public void updateDelete(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updateDelete(t);
+ }
+ }
+
+ public void updateGet(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updateGet(t);
+ }
+ }
+
+ public void updateIncrement(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updateIncrement(t);
+ }
+ }
+
+ public void updateAppend(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updateAppend(t);
+ }
+ }
+
+ public void updateReplay(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updateReplay(t);
+ }
+ }
+
+ public void updateScanTime(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ source.getOrCreateMetricsUser(user).updateScanTime(t);
+ }
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index 5f56ba9..1f35e89 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
@@ -48,7 +49,7 @@ public class TestMetricsRegionServer {
@Before
public void setUp() {
wrapper = new MetricsRegionServerWrapperStub();
- rsm = new MetricsRegionServer(wrapper);
+ rsm = new MetricsRegionServer(HBaseConfiguration.create(), wrapper);
serverSource = rsm.getMetricsSource();
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java
new file mode 100644
index 0000000..c45433a
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.security.PrivilegedAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestMetricsUserAggregate {
+
+ public static MetricsAssertHelper HELPER =
+ CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+ private MetricsRegionServerWrapperStub wrapper;
+ private MetricsRegionServer rsm;
+ private MetricsUserAggregate userAgg;
+
+ @BeforeClass
+ public static void classSetUp() {
+ HELPER.init();
+ }
+
+ @Before
+ public void setUp() {
+ wrapper = new MetricsRegionServerWrapperStub();
+ rsm = new MetricsRegionServer(HBaseConfiguration.create(), wrapper);
+ userAgg = rsm.getMetricsUserAggregate();
+ }
+
+ private void doOperations() {
+ for (int i=0; i < 10; i ++) {
+ rsm.updateGet(10);
+ }
+ for (int i=0; i < 11; i ++) {
+ rsm.updateScanTime(11);
+ }
+ for (int i=0; i < 12; i ++) {
+ rsm.updatePut(12);
+ }
+ for (int i=0; i < 13; i ++) {
+ rsm.updateDelete(13);
+ }
+ for (int i=0; i < 14; i ++) {
+ rsm.updateIncrement(14);
+ }
+ for (int i=0; i < 15; i ++) {
+ rsm.updateAppend(15);
+ }
+ for (int i=0; i < 16; i ++) {
+ rsm.updateReplay(16);
+ }
+ }
+
+ @Test
+ public void testPerUserOperations() {
+ Configuration conf = HBaseConfiguration.create();
+ User userFoo = User.createUserForTesting(conf, "FOO", new String[0]);
+ User userBar = User.createUserForTesting(conf, "BAR", new String[0]);
+
+ userFoo.getUGI().doAs(new PrivilegedAction() {
+ @Override
+ public Void run() {
+ doOperations();
+ return null;
+ }
+ });
+
+ userBar.getUGI().doAs(new PrivilegedAction() {
+ @Override
+ public Void run() {
+ doOperations();
+ return null;
+ }
+ });
+
+ HELPER.assertCounter("userfoometricgetnumops", 10, userAgg.getSource());
+ HELPER.assertCounter("userfoometricscantimenumops", 11, userAgg.getSource());
+ HELPER.assertCounter("userfoometricmutatenumops", 12, userAgg.getSource());
+ HELPER.assertCounter("userfoometricdeletenumops", 13, userAgg.getSource());
+ HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource());
+ HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource());
+ HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource());
+
+ HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricmutatenumops", 12, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricdeletenumops", 13, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource());
+ }
+}