diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java index f746c98..0775c82 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java @@ -58,6 +58,13 @@ public interface MetricsTableAggregateSource extends BaseSource { MetricsTableSource getOrCreateTableSource(String table, MetricsTableWrapperAggregate wrapper); /** + * Registers MetricsTableSource for table. No op if one MetricsTableSource exists for the table + * @param table the table for which to register MetricsTableSource + * @param wrapper metrics wrapper + */ + void register(String table, MetricsTableWrapperAggregate wrapper); + + /** * Remove a table's source. This is called when regions of a table are closed. * * @param table The table name diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java index 5133a96..8555db0 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java @@ -47,8 +47,26 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl super(metricsName, metricsDescription, metricsContext, metricsJmxContext); } - private void register(MetricsTableSource source) { - source.registerMetrics(); + private MetricsTableSource registerHelper(String table, MetricsTableWrapperAggregate wrapper) { + MetricsTableSource src = tableSources.get(table); + if (src != null) { + return src; + } + return tableSources.computeIfAbsent(table, k -> { + MetricsTableSource newSource = CompatibilitySingletonFactory + .getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper); + // register the new metrics now + newSource.registerMetrics(); + return newSource; + }); + } + + @Override + public void register(String table, MetricsTableWrapperAggregate wrapper) { + if (tableSources.containsKey(table)) { + return; + } + registerHelper(table, wrapper); } @Override @@ -72,13 +90,7 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl if (source != null) { return source; } - MetricsTableSource newSource = CompatibilitySingletonFactory - .getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper); - return tableSources.computeIfAbsent(table, k -> { - // register the new metrics now - newSource.registerMetrics(); - return newSource; - }); + return registerHelper(table, wrapper); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index aec94d4..210f305 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -40,9 +40,11 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; @@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -285,6 +288,7 @@ public class HRegionServer extends HasThread implements */ protected final Map onlineRegions = new ConcurrentHashMap<>(); + private final java.util.concurrent.ExecutorService executorForMetrics; /** * Map of encoded region names to the DataNode locations they should be hosted on * We store the value as InetSocketAddress since this is used only in HDFS @@ -606,6 +610,9 @@ public class HRegionServer extends HasThread implements this.configurationManager = new ConfigurationManager(); setupWindows(getConfiguration(), getConfigurationManager()); + executorForMetrics = Executors.newSingleThreadExecutor( + new DaemonThreadFactory("metrics registration")); + // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher @@ -2762,9 +2769,17 @@ public class HRegionServer extends HasThread implements } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void addRegion(HRegion region) { this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); configurationManager.registerObserver(region); + executorForMetrics.submit(new Callable() { + @Override + public Void call() { + metricsRegionServer.register(region.getRegionInfo().getTable().getNameAsString()); + return null; + } + }); } private void addRegion(SortedMap> sortedRegions, HRegion region, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 21534ce..e67794b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -99,6 +99,10 @@ public class MetricsRegionServer { return regionServerWrapper; } + public void register(String table) { + metricsTable.register(table); + } + public void updatePutBatch(TableName tn, long t) { if (tableMetrics != null && tn != null) { tableMetrics.updatePut(tn, t); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java index a3f0dff..1d8a1a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java @@ -36,6 +36,10 @@ public class MetricsTable { return wrapper; } + void register(String table) { + tableSourceAgg.register(table, wrapper); + } + public MetricsTableAggregateSource getTableSourceAgg() { return tableSourceAgg; }