diff --git hbase-hadoop-compat/pom.xml hbase-hadoop-compat/pom.xml index 016dd24..fe56064 100644 --- hbase-hadoop-compat/pom.xml +++ hbase-hadoop-compat/pom.xml @@ -110,6 +110,10 @@ org.apache.hbase hbase-annotations + + + org.apache.hbase + hbase-annotations test-jar test diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java index bb44946..d3a0db3 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; + /** * Interface of a factory to create Metrics Sources used inside of regionservers. */ @@ -38,4 +39,16 @@ public interface MetricsRegionServerSourceFactory { * @return A metrics region source */ MetricsRegionSource createRegion(MetricsRegionWrapper wrapper); + + /** + * Create a MetricsUserSource from a user + * @return A metrics user source + */ + MetricsUserSource createUser(String shortUserName); + + /** + * Return the singleton instance for MetricsUserAggregateSource + * @return A metrics user aggregate source + */ + MetricsUserAggregateSource getUserAggregate(); } diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java new file mode 100644 index 0000000..86131f8 --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** +* This interface will be implemented by a MetricsSource that will export metrics from +* multiple users into the hadoop metrics system. +*/ +@InterfaceAudience.Private +public interface MetricsUserAggregateSource extends BaseSource { + + /** + * The name of the metrics + */ + static final String METRICS_NAME = "Users"; + + /** + * The name of the metrics context that metrics will be under. + */ + static final String METRICS_CONTEXT = "regionserver"; + + /** + * Description + */ + static final String METRICS_DESCRIPTION = "Metrics about users connected to the regionserver"; + + /** + * The name of the metrics context that metrics will be under in jmx + */ + static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + static final String NUM_USERS = "numUsers"; + static final String NUMBER_OF_USERS_DESC = "Number of users in the metrics system"; + + /** + * Returns a MetricsUserSource if already exists, or creates and registers one for this user + * @param user the user name + * @return a metrics user source + */ + MetricsUserSource getOrCreateMetricsUser(String user); +} diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java new file mode 100644 index 0000000..249327c --- /dev/null +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public interface MetricsUserSource extends Comparable { + + String getUser(); + + void register(); + + void deregister(); + + void updatePut(long t); + + void updateDelete(long t); + + void updateGet(long t); + + void updateIncrement(long t); + + void updateAppend(long t); + + void updateReplay(long t); + + void updateScanTime(long t); +} diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java index c483083..424e501 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java @@ -28,26 +28,41 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer public static enum FactoryStorage { INSTANCE; private Object aggLock = new Object(); - private MetricsRegionAggregateSourceImpl aggImpl; + private MetricsRegionAggregateSourceImpl regionAggImpl; + private MetricsUserAggregateSourceImpl userAggImpl; } - private synchronized MetricsRegionAggregateSourceImpl getAggregate() { + private synchronized MetricsRegionAggregateSourceImpl getRegionAggregate() { synchronized (FactoryStorage.INSTANCE.aggLock) { - if (FactoryStorage.INSTANCE.aggImpl == null) { - FactoryStorage.INSTANCE.aggImpl = new MetricsRegionAggregateSourceImpl(); + if (FactoryStorage.INSTANCE.regionAggImpl == null) { + FactoryStorage.INSTANCE.regionAggImpl = new MetricsRegionAggregateSourceImpl(); } - return FactoryStorage.INSTANCE.aggImpl; + return FactoryStorage.INSTANCE.regionAggImpl; } } + public synchronized MetricsUserAggregateSourceImpl getUserAggregate() { + synchronized (FactoryStorage.INSTANCE.aggLock) { + if (FactoryStorage.INSTANCE.userAggImpl == null) { + FactoryStorage.INSTANCE.userAggImpl = new MetricsUserAggregateSourceImpl(); + } + return FactoryStorage.INSTANCE.userAggImpl; + } + } @Override - public synchronized MetricsRegionServerSource createServer(MetricsRegionServerWrapper regionServerWrapper) { + public synchronized MetricsRegionServerSource createServer( + MetricsRegionServerWrapper regionServerWrapper) { return new MetricsRegionServerSourceImpl(regionServerWrapper); } @Override public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) { - return new MetricsRegionSourceImpl(wrapper, getAggregate()); + return new MetricsRegionSourceImpl(wrapper, getRegionAggregate()); + } + + @Override + public MetricsUserSource createUser(String shortUserName) { + return new MetricsUserSourceImpl(shortUserName, getUserAggregate()); } } diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java new file mode 100644 index 0000000..471312a --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.Interns; + +@InterfaceAudience.Private +public class MetricsUserAggregateSourceImpl extends BaseSourceImpl + implements MetricsUserAggregateSource { + + private static final Log LOG = LogFactory.getLog(MetricsUserAggregateSourceImpl.class); + + private final ConcurrentHashMap userSources = + new ConcurrentHashMap(); + + public MetricsUserAggregateSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + public MetricsUserAggregateSourceImpl(String metricsName, + String metricsDescription, + String metricsContext, + String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + } + + @Override + public MetricsUserSource getOrCreateMetricsUser(String user) { + MetricsUserSource source = userSources.get(user); + if (source != null) { + return source; + } + source = new MetricsUserSourceImpl(user, this); + MetricsUserSource prev = userSources.putIfAbsent(user, source); + + if (prev != null) { + return prev; + } else { + // register the new metrics now + register(source); + } + return source; + } + + public void register(MetricsUserSource source) { + synchronized (this) { + source.register(); + } + } + + public void deregister(MetricsUserSource toRemove) { + /* + * We do not deregister per-user level metrics for now, so this function is not used. The user + * can go away and come back hours or days later, but we still would like to keep context for + * the user to calculate the counters and rates. Users are usually static in a cluster and + * there are not thousands of them, so it should be fine to not do the cleanup. + * + * Since we are not removing metrics, JmxCacheBuster is also not called periodically. + */ + try { + synchronized (this) { + MetricsUserSource source = userSources.remove(toRemove); + if (source != null) { + source.deregister(); + } + } + } catch (Exception e) { + // Ignored. If this errors out it means that someone is double + // closing the user source and the user metrics is already nulled out. + LOG.info( "Error trying to remove " + toRemove + " from " + getClass().getSimpleName(), e); + } + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder mrb = collector.addRecord(metricsName); + + if (userSources != null) { + for (MetricsUserSource userMetricSource : userSources.values()) { + if (userMetricSource instanceof MetricsUserSourceImpl) { + ((MetricsUserSourceImpl) userMetricSource).snapshot(mrb, all); + } + } + mrb.addGauge(Interns.info(NUM_USERS, NUMBER_OF_USERS_DESC), userSources.size()); + metricsRegistry.snapshot(mrb, all); + } + } +} diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java new file mode 100644 index 0000000..2237358 --- /dev/null +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; + +@InterfaceAudience.Private +public class MetricsUserSourceImpl implements MetricsUserSource { + private static final Log LOG = LogFactory.getLog(MetricsUserSourceImpl.class); + + private final String userNamePrefix; + + private final String user; + + private final String userGetKey; + private final String userScanTimeKey; + private final String userPutKey; + private final String userDeleteKey; + private final String userIncrementKey; + private final String userAppendKey; + private final String userReplayKey; + + private MetricHistogram getHisto; + private MetricHistogram scanTimeHisto; + private MetricHistogram putHisto; + private MetricHistogram deleteHisto; + private MetricHistogram incrementHisto; + private MetricHistogram appendHisto; + private MetricHistogram replayHisto; + + private final int hashCode; + + private AtomicBoolean closed = new AtomicBoolean(false); + private final MetricsUserAggregateSourceImpl agg; + private final DynamicMetricsRegistry registry; + + public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new MetricsUserSourceImpl for user " + user); + } + + this.user = user; + this.agg = agg; + this.registry = agg.getMetricsRegistry(); + + this.userNamePrefix = "user_" + user + "_metric_"; + + hashCode = userNamePrefix.hashCode(); + + userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY; + userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY; + userPutKey = userNamePrefix + MetricsRegionServerSource.MUTATE_KEY; + userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY; + userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY; + userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY; + userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY; + + agg.register(this); + } + + @Override + public void register() { + synchronized (this) { + getHisto = registry.newTimeHistogram(userGetKey); + scanTimeHisto = registry.newTimeHistogram(userScanTimeKey); + putHisto = registry.newTimeHistogram(userPutKey); + deleteHisto = registry.newTimeHistogram(userDeleteKey); + incrementHisto = registry.newTimeHistogram(userIncrementKey); + appendHisto = registry.newTimeHistogram(userAppendKey); + replayHisto = registry.newTimeHistogram(userReplayKey); + } + } + + @Override + public void deregister() { + boolean wasClosed = closed.getAndSet(true); + + // Has someone else already closed this for us? + if (wasClosed) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing user Metrics for user: " + user); + } + + synchronized (this) { + registry.removeHistogramMetrics(userGetKey); + registry.removeHistogramMetrics(userScanTimeKey); + registry.removeHistogramMetrics(userPutKey); + registry.removeHistogramMetrics(userDeleteKey); + registry.removeHistogramMetrics(userIncrementKey); + registry.removeHistogramMetrics(userAppendKey); + registry.removeHistogramMetrics(userReplayKey); + } + } + + @Override + public String getUser() { + return user; + } + + @Override + public int compareTo(MetricsUserSource source) { + if (source == null) { + return -1; + } + if (!(source instanceof MetricsUserSourceImpl)) { + return -1; + } + + MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source; + + return Long.compare(hashCode, impl.hashCode); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0); + } + + void snapshot(MetricsRecordBuilder mrb, boolean ignored) { + // If there is a close that started be double extra sure + // that we're not getting any locks and not putting data + // into the metrics that should be removed. So early out + // before even getting the lock. + if (closed.get()) { + return; + } + + // Grab the read + // This ensures that removes of the metrics + // can't happen while we are putting them back in. + synchronized (this) { + + // It's possible that a close happened between checking + // the closed variable and getting the lock. + if (closed.get()) { + return; + } + } + } + + @Override + public void updatePut(long t) { + putHisto.add(t); + } + + @Override + public void updateDelete(long t) { + deleteHisto.add(t); + } + + @Override + public void updateGet(long t) { + getHisto.add(t); + } + + @Override + public void updateIncrement(long t) { + incrementHisto.add(t); + } + + @Override + public void updateAppend(long t) { + appendHisto.add(t); + } + + @Override + public void updateReplay(long t) { + replayHisto.add(t); + } + + @Override + public void updateScanTime(long t) { + scanTimeHisto.add(t); + } +} diff --git hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java new file mode 100644 index 0000000..3343137 --- /dev/null +++ hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.testclassification.MetricsTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MetricsTests.class, SmallTests.class}) +public class TestMetricsUserSourceImpl { + + @Test + public void testCompareToHashCodeEquals() throws Exception { + MetricsRegionServerSourceFactory fact + = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class); + + MetricsUserSource one = fact.createUser("ONE"); + MetricsUserSource oneClone = fact.createUser("ONE"); + MetricsUserSource two = fact.createUser("TWO"); + + assertEquals(0, one.compareTo(oneClone)); + assertEquals(one.hashCode(), oneClone.hashCode()); + assertNotEquals(one, two); + + assertTrue( one.compareTo(two) != 0); + assertTrue( two.compareTo(one) != 0); + assertTrue( two.compareTo(one) != one.compareTo(two)); + assertTrue( two.compareTo(two) == 0); + } + + + @Test (expected = RuntimeException.class) + public void testNoGetRegionServerMetricsSourceImpl() throws Exception { + // This should throw an exception because MetricsUserSourceImpl should only + // be created by a factory. + CompatibilitySingletonFactory.getInstance(MetricsUserSource.class); + } + + @Test + public void testGetUser() { + MetricsRegionServerSourceFactory fact + = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class); + + MetricsUserSource one = fact.createUser("ONE"); + assertEquals("ONE", one.getUser()); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1476190..e99c241 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1405,7 +1405,8 @@ public class HRegionServer extends HasThread implements this.cacheConfig = new CacheConfig(conf); this.walFactory = setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set - this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); + this.metricsRegionServer = new MetricsRegionServer(getConfiguration(), + new MetricsRegionServerWrapperImpl(this)); startServiceThreads(); startHeapMemoryManager(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 8bca6c5..9bf04d8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; /** @@ -33,20 +34,23 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @InterfaceStability.Evolving @InterfaceAudience.Private public class MetricsRegionServer { - private MetricsRegionServerSource serverSource; - private MetricsRegionServerWrapper regionServerWrapper; + private final MetricsRegionServerSource serverSource; + private final MetricsRegionServerWrapper regionServerWrapper; - public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper) { - this(regionServerWrapper, + private final MetricsUserAggregate userAggregate; + + public MetricsRegionServer(Configuration conf, MetricsRegionServerWrapper regionServerWrapper) { + this(conf, regionServerWrapper, CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) .createServer(regionServerWrapper)); } - MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, + MetricsRegionServer(Configuration conf, MetricsRegionServerWrapper regionServerWrapper, MetricsRegionServerSource serverSource) { this.regionServerWrapper = regionServerWrapper; this.serverSource = serverSource; + this.userAggregate = new MetricsUserAggregate(conf); } @VisibleForTesting @@ -54,6 +58,11 @@ public class MetricsRegionServer { return serverSource; } + @VisibleForTesting + public MetricsUserAggregate getMetricsUserAggregate() { + return userAggregate; + } + public MetricsRegionServerWrapper getRegionServerWrapper() { return regionServerWrapper; } @@ -63,6 +72,7 @@ public class MetricsRegionServer { serverSource.incrSlowPut(); } serverSource.updatePut(t); + userAggregate.updatePut(t); } public void updateDelete(long t) { @@ -70,6 +80,7 @@ public class MetricsRegionServer { serverSource.incrSlowDelete(); } serverSource.updateDelete(t); + userAggregate.updateDelete(t); } public void updateGet(long t) { @@ -77,6 +88,7 @@ public class MetricsRegionServer { serverSource.incrSlowGet(); } serverSource.updateGet(t); + userAggregate.updateGet(t); } public void updateIncrement(long t) { @@ -84,6 +96,7 @@ public class MetricsRegionServer { serverSource.incrSlowIncrement(); } serverSource.updateIncrement(t); + userAggregate.updateIncrement(t); } public void updateAppend(long t) { @@ -91,10 +104,12 @@ public class MetricsRegionServer { serverSource.incrSlowAppend(); } serverSource.updateAppend(t); + userAggregate.updateAppend(t); } public void updateReplay(long t){ serverSource.updateReplay(t); + userAggregate.updateReplay(t); } public void updateScanSize(long scanSize){ @@ -103,6 +118,7 @@ public class MetricsRegionServer { public void updateScanTime(long t) { serverSource.updateScanTime(t); + userAggregate.updateScanTime(t); } public void updateSplitTime(long t) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java new file mode 100644 index 0000000..15c1d8c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import com.google.common.annotations.VisibleForTesting; + +public class MetricsUserAggregate { + + /** Provider for mapping principal names to Users */ + private final UserProvider userProvider; + + private final MetricsUserAggregateSource source; + + public MetricsUserAggregate(Configuration conf) { + source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) + .getUserAggregate(); + + this.userProvider = UserProvider.instantiate(conf); + } + + /** + * Returns the active user to which authorization checks should be applied. + * If we are in the context of an RPC call, the remote user is used, + * otherwise the currently logged in user is used. + */ + private String getActiveUser() { + User user = RpcServer.getRequestUser(); + if (user == null) { + // for non-rpc handling, fallback to system user + try { + user = userProvider.getCurrent(); + } catch (IOException ignore) { + } + } + return user != null ? user.getShortName() : null; + } + + @VisibleForTesting + MetricsUserAggregateSource getSource() { + return source; + } + + public void updatePut(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updatePut(t); + } + } + + public void updateDelete(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updateDelete(t); + } + } + + public void updateGet(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updateGet(t); + } + } + + public void updateIncrement(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updateIncrement(t); + } + } + + public void updateAppend(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updateAppend(t); + } + } + + public void updateReplay(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updateReplay(t); + } + } + + public void updateScanTime(long t) { + String user = getActiveUser(); + if (user != null) { + source.getOrCreateMetricsUser(user).updateScanTime(t); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index 5f56ba9..1f35e89 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.test.MetricsAssertHelper; @@ -48,7 +49,7 @@ public class TestMetricsRegionServer { @Before public void setUp() { wrapper = new MetricsRegionServerWrapperStub(); - rsm = new MetricsRegionServer(wrapper); + rsm = new MetricsRegionServer(HBaseConfiguration.create(), wrapper); serverSource = rsm.getMetricsSource(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java new file mode 100644 index 0000000..c45433a --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.security.PrivilegedAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMetricsUserAggregate { + + public static MetricsAssertHelper HELPER = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); + + private MetricsRegionServerWrapperStub wrapper; + private MetricsRegionServer rsm; + private MetricsUserAggregate userAgg; + + @BeforeClass + public static void classSetUp() { + HELPER.init(); + } + + @Before + public void setUp() { + wrapper = new MetricsRegionServerWrapperStub(); + rsm = new MetricsRegionServer(HBaseConfiguration.create(), wrapper); + userAgg = rsm.getMetricsUserAggregate(); + } + + private void doOperations() { + for (int i=0; i < 10; i ++) { + rsm.updateGet(10); + } + for (int i=0; i < 11; i ++) { + rsm.updateScanTime(11); + } + for (int i=0; i < 12; i ++) { + rsm.updatePut(12); + } + for (int i=0; i < 13; i ++) { + rsm.updateDelete(13); + } + for (int i=0; i < 14; i ++) { + rsm.updateIncrement(14); + } + for (int i=0; i < 15; i ++) { + rsm.updateAppend(15); + } + for (int i=0; i < 16; i ++) { + rsm.updateReplay(16); + } + } + + @Test + public void testPerUserOperations() { + Configuration conf = HBaseConfiguration.create(); + User userFoo = User.createUserForTesting(conf, "FOO", new String[0]); + User userBar = User.createUserForTesting(conf, "BAR", new String[0]); + + userFoo.getUGI().doAs(new PrivilegedAction() { + @Override + public Void run() { + doOperations(); + return null; + } + }); + + userBar.getUGI().doAs(new PrivilegedAction() { + @Override + public Void run() { + doOperations(); + return null; + } + }); + + HELPER.assertCounter("userfoometricgetnumops", 10, userAgg.getSource()); + HELPER.assertCounter("userfoometricscantimenumops", 11, userAgg.getSource()); + HELPER.assertCounter("userfoometricmutatenumops", 12, userAgg.getSource()); + HELPER.assertCounter("userfoometricdeletenumops", 13, userAgg.getSource()); + HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource()); + HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource()); + HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource()); + + HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource()); + HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource()); + HELPER.assertCounter("userbarmetricmutatenumops", 12, userAgg.getSource()); + HELPER.assertCounter("userbarmetricdeletenumops", 13, userAgg.getSource()); + HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource()); + HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource()); + HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource()); + } +}