From bf70dcbcbb13619d6270a56e3022f7be81ef4273 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 18 Sep 2016 21:54:17 +0800 Subject: [PATCH] HBASE-16648 [JDK8] Use computeIfAbsent instead of get and putIfAbsent --- .../apache/hadoop/hbase/client/AsyncProcess.java | 16 +-- .../hbase/client/ConnectionImplementation.java | 130 ++++++++++----------- .../org/apache/hadoop/hbase/client/MetaCache.java | 18 +-- .../hadoop/hbase/client/MetricsConnection.java | 35 +----- .../client/PreemptiveFastFailInterceptor.java | 11 +- .../hbase/client/ServerStatisticTracker.java | 28 ++--- .../apache/hadoop/hbase/util/CollectionUtils.java | 16 +++ .../apache/hadoop/hbase/util/WeakObjectPool.java | 43 ++++--- .../ZKSplitLogManagerCoordination.java | 11 +- .../hadoop/hbase/executor/ExecutorService.java | 22 ++-- .../apache/hadoop/hbase/master/ServerManager.java | 17 +-- .../hadoop/hbase/master/SplitLogManager.java | 16 +-- .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 46 +++----- .../apache/hadoop/hbase/regionserver/HRegion.java | 14 +-- .../regionserver/wal/SequenceIdAccounting.java | 12 +- .../hbase/security/access/TableAuthManager.java | 10 +- .../hadoop/hbase/wal/BoundedGroupingStrategy.java | 14 +-- 17 files changed, 169 insertions(+), 290 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 2ffb2e3..1029e5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -622,22 +622,12 @@ class AsyncProcess { protected void incTaskCounters(Collection regions, ServerName sn) { tasksInProgress.incrementAndGet(); - AtomicInteger serverCnt = taskCounterPerServer.get(sn); - if (serverCnt == null) { - taskCounterPerServer.putIfAbsent(sn, new AtomicInteger()); - serverCnt = taskCounterPerServer.get(sn); - } + AtomicInteger serverCnt = taskCounterPerServer.computeIfAbsent(sn, k -> new AtomicInteger()); serverCnt.incrementAndGet(); for (byte[] regBytes : regions) { - AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); - if (regionCnt == null) { - regionCnt = new AtomicInteger(); - AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt); - if (oldCnt != null) { - regionCnt = oldCnt; - } - } + AtomicInteger regionCnt = taskCounterPerRegion.computeIfAbsent(regBytes, + k -> new AtomicInteger()); regionCnt.incrementAndGet(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 38178b4..6623a34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -38,8 +44,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import edu.umd.cs.findbugs.annotations.Nullable; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -91,10 +95,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import edu.umd.cs.findbugs.annotations.Nullable; /** * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. @@ -905,9 +906,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // Map keyed by service name + regionserver to service stub implementation private final ConcurrentHashMap stubs = new ConcurrentHashMap(); - // Map of locks used creating service stubs per regionserver. - private final ConcurrentHashMap connectionLock = - new ConcurrentHashMap(); /** * State of the MasterService connection/setup. @@ -1001,7 +999,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { long result; ServerErrors errorStats = errorsByServer.get(server); if (errorStats != null) { - result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount()); + result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount())); } else { result = 0; // yes, if the server is not in our list we don't wait before retrying. } @@ -1010,19 +1008,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { /** * Reports that there was an error on the server to do whatever bean-counting necessary. - * * @param server The server in question. */ void reportServerError(ServerName server) { - ServerErrors errors = errorsByServer.get(server); - if (errors != null) { - errors.addError(); - } else { - errors = errorsByServer.putIfAbsent(server, new ServerErrors()); - if (errors != null){ - errors.addError(); - } - } + errorsByServer.computeIfAbsent(server, k -> new ServerErrors()).addError(); } long getStartTrackingTime() { @@ -1033,7 +1022,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { * The record of errors for a server. */ private static class ServerErrors { - private final AtomicInteger retries = new AtomicInteger(0); + private final AtomicInteger retries = new AtomicInteger(-1); public int getCount() { return retries.get(); @@ -1067,8 +1056,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { protected abstract void isMasterRunning() throws IOException; /** - * Create a stub. Try once only. It is not typed because there is no common type to - * protobuf services nor their interfaces. Let the caller do appropriate casting. + * Create a stub. Try once only. It is not typed because there is no common type to protobuf + * services nor their interfaces. Let the caller do appropriate casting. * @return A stub for master services. */ private Object makeStubNoRetries() throws IOException, KeeperException { @@ -1091,20 +1080,20 @@ class ConnectionImplementation implements ClusterConnection, Closeable { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key - String key = getStubKey(getServiceName(), - sn.getHostname(), sn.getPort(), hostnamesCanChange); - connectionLock.putIfAbsent(key, key); - Object stub = null; - synchronized (connectionLock.get(key)) { - stub = stubs.get(key); - if (stub == null) { + String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort(), + hostnamesCanChange); + return stubs.computeIfAbsent(key, k -> { + try { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); - stub = makeStub(channel); + Object stub = makeStub(channel); isMasterRunning(); - stubs.put(key, stub); + return stub; + } catch (IOException e) { + throw new UncheckedIOException(e); } - } - return stub; + }); + } catch (UncheckedIOException e) { + throw e.getCause(); } finally { zkw.close(); } @@ -1167,6 +1156,16 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } + private AdminProtos.AdminService.BlockingInterface createAdmin(ServerName serverName) { + BlockingRpcChannel channel; + try { + channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); + return AdminProtos.AdminService.newBlockingStub(channel); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException { @@ -1175,43 +1174,40 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange); - this.connectionLock.putIfAbsent(key, key); - AdminProtos.AdminService.BlockingInterface stub; - synchronized (this.connectionLock.get(key)) { - stub = (AdminProtos.AdminService.BlockingInterface)this.stubs.get(key); - if (stub == null) { - BlockingRpcChannel channel = - this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); - stub = AdminProtos.AdminService.newBlockingStub(channel); - this.stubs.put(key, stub); - } + try { + return (AdminProtos.AdminService.BlockingInterface) this.stubs.computeIfAbsent(key, + k -> createAdmin(serverName)); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + + private ClientProtos.ClientService.BlockingInterface createClient(ServerName serverName) { + // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. + // Just fail on first actual call rather than in here on setup. + try { + BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, + rpcTimeout); + return ClientProtos.ClientService.newBlockingStub(channel); + } catch (IOException e) { + throw new UncheckedIOException(e); } - return stub; } @Override - public ClientProtos.ClientService.BlockingInterface getClient(final ServerName sn) - throws IOException { - if (isDeadServer(sn)) { - throw new RegionServerStoppedException(sn + " is dead."); - } - String key = getStubKey( - ClientProtos.ClientService.BlockingInterface.class.getName(), sn.getHostname(), - sn.getPort(), this.hostnamesCanChange); - this.connectionLock.putIfAbsent(key, key); - ClientProtos.ClientService.BlockingInterface stub = null; - synchronized (this.connectionLock.get(key)) { - stub = (ClientProtos.ClientService.BlockingInterface)this.stubs.get(key); - if (stub == null) { - BlockingRpcChannel channel = - this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); - stub = ClientProtos.ClientService.newBlockingStub(channel); - // In old days, after getting stub/proxy, we'd make a call. We are not doing that here. - // Just fail on first actual call rather than in here on setup. - this.stubs.put(key, stub); - } - } - return stub; + public ClientProtos.ClientService.BlockingInterface getClient(final ServerName serverName) + throws IOException { + if (isDeadServer(serverName)) { + throw new RegionServerStoppedException(serverName + " is dead."); + } + String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), + serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange); + try { + return (ClientProtos.ClientService.BlockingInterface) this.stubs.computeIfAbsent(key, + k -> createClient(serverName)); + } catch (UncheckedIOException e) { + throw e.getCause(); + } } static String getStubKey(final String serviceName, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 3914df5..0910875 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -191,21 +191,11 @@ public class MetaCache { * @param tableName * @return Map of cached locations for passed tableName */ - private ConcurrentNavigableMap - getTableLocations(final TableName tableName) { + private ConcurrentNavigableMap getTableLocations( + final TableName tableName) { // find the map of cached locations for this table - ConcurrentNavigableMap result; - result = this.cachedRegionLocations.get(tableName); - // if tableLocations for this table isn't built yet, make one - if (result == null) { - result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR); - ConcurrentNavigableMap old = - this.cachedRegionLocations.putIfAbsent(tableName, result); - if (old != null) { - return old; - } - } - return result; + return this.cachedRegionLocations.computeIfAbsent(tableName, + k -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR)); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 4fa20e6..93a6342 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -207,32 +207,15 @@ public class MetricsConnection implements StatisticTrackable { } @Override - public void updateRegionStats(ServerName serverName, byte[] regionName, - RegionLoadStats stats) { + public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); - ConcurrentMap rsStats = null; - if (serverStats.containsKey(serverName)) { - rsStats = serverStats.get(serverName); - } else { - rsStats = serverStats.putIfAbsent(serverName, - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)); - if (rsStats == null) { - rsStats = serverStats.get(serverName); - } - } - RegionStats regionStats = null; - if (rsStats.containsKey(regionName)) { - regionStats = rsStats.get(regionName); - } else { - regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name)); - if (regionStats == null) { - regionStats = rsStats.get(regionName); - } - } + ConcurrentMap rsStats = serverStats.computeIfAbsent(serverName, + k -> new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)); + RegionStats regionStats = rsStats.computeIfAbsent(regionName, + k -> new RegionStats(this.registry, name)); regionStats.update(stats); } - /** A lambda for dispatching to the appropriate metric factory method */ private static interface NewMetric { T newMetric(Class clazz, String name, String scope); @@ -386,13 +369,7 @@ public class MetricsConnection implements StatisticTrackable { * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. */ private T getMetric(String key, ConcurrentMap map, NewMetric factory) { - T t = map.get(key); - if (t == null) { - t = factory.newMetric(this.getClass(), key, scope); - T tmp = map.putIfAbsent(key, t); - t = (tmp == null) ? t : tmp; - } - return t; + return map.computeIfAbsent(key, k -> factory.newMetric(this.getClass(), k, scope)); } /** Update call stats for non-critical-path methods */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index fed87c1..f42e392 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -155,15 +155,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { return; } long currentTime = EnvironmentEdgeManager.currentTime(); - FailureInfo fInfo = repeatedFailuresMap.get(serverName); - if (fInfo == null) { - fInfo = new FailureInfo(currentTime); - FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo); - - if (oldfInfo != null) { - fInfo = oldfInfo; - } - } + FailureInfo fInfo = repeatedFailuresMap.computeIfAbsent(serverName, + k -> new FailureInfo(currentTime)); fInfo.timeOfLatestAttemptMilliSec = currentTime; fInfo.numConsecutiveFailures.incrementAndGet(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java index cb21e8b..b77fa4c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -18,40 +18,28 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; -import java.util.concurrent.ConcurrentHashMap; - /** * Tracks the statistics for multiple regions */ @InterfaceAudience.Private public class ServerStatisticTracker implements StatisticTrackable { - private final ConcurrentHashMap stats = - new ConcurrentHashMap(); + private final ConcurrentMap stats + = new ConcurrentHashMap(); @Override - public void updateRegionStats(ServerName server, byte[] region, RegionLoadStats - currentStats) { - ServerStatistics stat = stats.get(server); - - if (stat == null) { - stat = stats.get(server); - // We don't have stats for that server yet, so we need to make an entry. - // If we race with another thread it's a harmless unnecessary allocation. - if (stat == null) { - stat = new ServerStatistics(); - ServerStatistics old = stats.putIfAbsent(server, stat); - if (old != null) { - stat = old; - } - } - } + public void updateRegionStats(ServerName server, byte[] region, RegionLoadStats currentStats) { + ServerStatistics stat = stats.computeIfAbsent(server, k -> new ServerStatistics()); stat.update(region, currentStats); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index b7b9beb..9096570 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -22,7 +22,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -104,4 +107,17 @@ public class CollectionUtils { } return list.get(list.size() - 1); } + + public static V getOrCreate(ConcurrentMap map, K key, Supplier supplier, + Runnable actionIfAbsent) { + MutableBoolean trigger = new MutableBoolean(false); + V v = map.computeIfAbsent(key, k -> { + trigger.setValue(true); + return supplier.get(); + }); + if (trigger.booleanValue()) { + actionIfAbsent.run(); + } + return v; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java index 7757c6c..f978c8e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java @@ -153,35 +153,34 @@ public class WeakObjectPool { } } + private static final class StrongReference { + V referent; + } + /** * Returns a shared object associated with the given {@code key}, * which is identified by the {@code equals} method. * @throws NullPointerException if {@code key} is null */ public V get(K key) { - ObjectReference ref = referenceCache.get(key); - if (ref != null) { - V obj = ref.get(); - if (obj != null) { - return obj; + StrongReference ref = new StrongReference<>(); + // we can not use the the returned ObjectReference to get the actual object because it maybe + // reclaimed since there is no strong reference to it. So here we use a external holder class + // to pass the actual object out. + referenceCache.compute(key, (k, v) -> { + if (v != null) { + V obj = v.get(); + // check if it has already been reclaimed by GC. + if (obj != null) { + ref.referent = obj; + return v; + } } - referenceCache.remove(key, ref); - } - - V newObj = objectFactory.createObject(key); - ObjectReference newRef = new ObjectReference(key, newObj); - while (true) { - ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef); - if (existingRef == null) { - return newObj; - } - - V existingObject = existingRef.get(); - if (existingObject != null) { - return existingObject; - } - referenceCache.remove(key, existingRef); - } + V newObj = objectFactory.createObject(key); + ref.referent = newObj; + return new ObjectReference(key, newObj); + }); + return ref.referent; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index d0a4d58..6ec6136 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; @@ -542,15 +543,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(); - Task task; - task = details.getTasks().putIfAbsent(path, orphanTask); - if (task == null) { - LOG.info("creating orphan task " + path); - SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); - task = orphanTask; - } - return task; + return SplitLogManager.findOrCreateOrphanTask(details.getTasks(), path); } private void heartbeat(String path, int new_version, ServerName workerName) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 403244f..631955e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -81,18 +81,16 @@ public class ExecutorService { */ @VisibleForTesting public void startExecutorService(String name, int maxThreads) { - if (this.executorMap.get(name) != null) { - throw new RuntimeException("An executor service with the name " + name + - " is already running!"); - } - Executor hbes = new Executor(name, maxThreads); - if (this.executorMap.putIfAbsent(name, hbes) != null) { - throw new RuntimeException("An executor service with the name " + name + - " is already running (2)!"); - } - LOG.debug("Starting executor service name=" + name + - ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() + - ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize()); + Executor hbes = this.executorMap.compute(name, (k, v) -> { + if (v != null) { + throw new RuntimeException( + "An executor service with the name " + k + " is already running!"); + } + return new Executor(k, maxThreads); + }); + LOG.debug("Starting executor service name=" + name + ", corePoolSize=" + + hbes.threadPoolExecutor.getCorePoolSize() + ", maxPoolSize=" + + hbes.threadPoolExecutor.getMaximumPoolSize()); } boolean isExecutorServiceRunning(String name) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index f97dfb4..fae9afb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -272,18 +272,6 @@ public class ServerManager { return sn; } - private ConcurrentNavigableMap getOrCreateStoreFlushedSequenceId( - byte[] regionName) { - ConcurrentNavigableMap storeFlushedSequenceId = - storeFlushedSequenceIdsByRegion.get(regionName); - if (storeFlushedSequenceId != null) { - return storeFlushedSequenceId; - } - storeFlushedSequenceId = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - ConcurrentNavigableMap alreadyPut = - storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId); - return alreadyPut == null ? storeFlushedSequenceId : alreadyPut; - } /** * Updates last flushed sequence Ids for the regions on server sn * @param sn @@ -307,8 +295,9 @@ public class ServerManager { + l + ") that is less than the previous last flushed sequence id (" + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring."); } - ConcurrentNavigableMap storeFlushedSequenceId = - getOrCreateStoreFlushedSequenceId(encodedRegionName); + ConcurrentNavigableMap storeFlushedSequenceId = storeFlushedSequenceIdsByRegion + .computeIfAbsent(encodedRegionName, + k -> new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)); for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) { byte[] family = storeSeqId.getFamilyName().toByteArray(); existingValue = storeFlushedSequenceId.get(family); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index fa5816f..2ca3b5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLog import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -516,16 +517,15 @@ public class SplitLogManager { } } - Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(); - Task task; - task = tasks.putIfAbsent(path, orphanTask); - if (task == null) { + public static Task findOrCreateOrphanTask(ConcurrentMap tasks, String path) { + return CollectionUtils.getOrCreate(tasks, path, Task::new, () -> { LOG.info("creating orphan task " + path); SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); - task = orphanTask; - } - return task; + }); + } + + Task findOrCreateOrphanTask(String path) { + return findOrCreateOrphanTask(tasks, path); } public void stop() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 15962d2..055fe45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.quotas; +import static org.apache.hadoop.hbase.util.CollectionUtils.getOrCreate; + +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -38,8 +42,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; -import com.google.common.annotations.VisibleForTesting; - /** * Cache that keeps track of the quota settings for the users and tables that * are interacting with it. @@ -114,20 +116,12 @@ public class QuotaCache implements Stoppable { /** * Returns the QuotaState associated to the specified user. - * * @param ugi the user * @return the quota info associated to specified user */ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { - String key = ugi.getShortUserName(); - UserQuotaState quotaInfo = userQuotaCache.get(key); - if (quotaInfo == null) { - quotaInfo = new UserQuotaState(); - if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) { - triggerCacheRefresh(); - } - } - return quotaInfo; + return getOrCreate(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, + this::triggerCacheRefresh); } /** @@ -151,24 +145,12 @@ public class QuotaCache implements Stoppable { } /** - * Returns the QuotaState requested. - * If the quota info is not in cache an empty one will be returned - * and the quota request will be enqueued for the next cache refresh. + * Returns the QuotaState requested. If the quota info is not in cache an empty one will be + * returned and the quota request will be enqueued for the next cache refresh. */ private QuotaState getQuotaState(final ConcurrentHashMap quotasMap, final K key) { - QuotaState quotaInfo = quotasMap.get(key); - if (quotaInfo == null) { - quotaInfo = new QuotaState(); - if (quotasMap.putIfAbsent(key, quotaInfo) == null) { - triggerCacheRefresh(); - } - } - return quotaInfo; - } - - private Configuration getConfiguration() { - return rsServices.getConfiguration(); + return getOrCreate(quotasMap, key, QuotaState::new, this::triggerCacheRefresh); } @VisibleForTesting @@ -210,14 +192,12 @@ public class QuotaCache implements Stoppable { protected void chore() { // Prefetch online tables/namespaces for (TableName table: QuotaCache.this.rsServices.getOnlineTables()) { - if (table.isSystemTable()) continue; - if (!QuotaCache.this.tableQuotaCache.contains(table)) { - QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState()); + if (table.isSystemTable()) { + continue; } + QuotaCache.this.tableQuotaCache.computeIfAbsent(table, k -> new QuotaState()); String ns = table.getNamespaceAsString(); - if (!QuotaCache.this.namespaceQuotaCache.contains(ns)) { - QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState()); - } + QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, k -> new QuotaState()); } fetchNamespaceQuotaState(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f97f6b2..7fc7120 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5239,19 +5239,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Keep trying until we have a lock or error out. // TODO: do we need to add a time component here? while (result == null) { + rowLockContext = lockedRows.computeIfAbsent(rowKey, k -> new RowLockContext(rowKey)); - // Try adding a RowLockContext to the lockedRows. - // If we can add it then there's no other transactions currently running. - rowLockContext = new RowLockContext(rowKey); - RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); - - // if there was a running transaction then there's already a context. - if (existingContext != null) { - rowLockContext = existingContext; - } - - // Now try an get the lock. - // + // Now try getting the lock. // This can fail as if (readLock) { result = rowLockContext.newReadLock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 62dea53..25b6c95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -215,16 +215,8 @@ class SequenceIdAccounting { @VisibleForTesting ConcurrentMap getOrCreateLowestSequenceIds(byte[] encodedRegionName) { // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. - ConcurrentMap m = this.lowestUnflushedSequenceIds - .get(encodedRegionName); - if (m != null) { - return m; - } - m = new ConcurrentHashMap<>(); - // Another thread may have added it ahead of us. - ConcurrentMap alreadyPut = this.lowestUnflushedSequenceIds - .putIfAbsent(encodedRegionName, m); - return alreadyPut == null ? m : alreadyPut; + return this.lowestUnflushedSequenceIds.computeIfAbsent(encodedRegionName, + k -> new ConcurrentHashMap<>()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 25cfc8b..f071d99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -276,17 +276,11 @@ public class TableAuthManager implements Closeable { } private PermissionCache getTablePermissions(TableName table) { - if (!tableCache.containsKey(table)) { - tableCache.putIfAbsent(table, new PermissionCache()); - } - return tableCache.get(table); + return tableCache.computeIfAbsent(table, k -> new PermissionCache<>()); } private PermissionCache getNamespacePermissions(String namespace) { - if (!nsCache.containsKey(namespace)) { - nsCache.putIfAbsent(namespace, new PermissionCache()); - } - return nsCache.get(namespace); + return nsCache.computeIfAbsent(namespace, k -> new PermissionCache<>()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java index 06f8792..9f49a55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -36,7 +37,7 @@ public class BoundedGroupingStrategy implements RegionGroupingStrategy{ static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups"; static final int DEFAULT_NUM_REGION_GROUPS = 2; - private ConcurrentHashMap groupNameCache = + private ConcurrentMap groupNameCache = new ConcurrentHashMap(); private AtomicInteger counter = new AtomicInteger(0); private String[] groupNames; @@ -44,15 +45,8 @@ public class BoundedGroupingStrategy implements RegionGroupingStrategy{ @Override public String group(byte[] identifier, byte[] namespace) { String idStr = Bytes.toString(identifier); - String groupName = groupNameCache.get(idStr); - if (null == groupName) { - groupName = groupNames[getAndIncrAtomicInteger(counter, groupNames.length)]; - String extantName = groupNameCache.putIfAbsent(idStr, groupName); - if (extantName != null) { - return extantName; - } - } - return groupName; + return groupNameCache.computeIfAbsent(idStr, + k -> groupNames[getAndIncrAtomicInteger(counter, groupNames.length)]); } // Non-blocking incrementing & resetting of AtomicInteger. -- 1.9.1