diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockCache.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockCache.java new file mode 100644 index 0000000..eb3cae1 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockCache.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Cache providing mapping of keys to values. Entries stay in cache till + * they are evicted. + * @param + */ +@InterfaceAudience.Private +public class LockCache { + // TODO - configure the concurrency of cache if required + LoadingCache cache = CacheBuilder.newBuilder().softValues().build(new CacheLoader() { + @Override + public Object load(K key) { + return new Object(); + } + }); + + /** + * Returns value associated with resource in cache if exists, first loading that value if necessary. + * @param resource the key in cache + * @return + */ + public Object getLock(K resource) { + return cache.getUnchecked(resource); + } + +} \ No newline at end of file diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockCache.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockCache.java new file mode 100644 index 0000000..ab81f59 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockCache.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestLockCache { + @Test + public void testLocker() { + LockCache lockCache = new LockCache(); + Object obj1 = lockCache.getLock("L1"); + Object obj2 = lockCache.getLock("L2"); + assertTrue(obj1 != obj2); + Object obj3 = lockCache.getLock("L1"); + assertTrue(obj1 == obj3); + } + + @Test + public void testSameLock() throws Exception { + StringBuffer sb = new StringBuffer(""); + LockCache lockCache = new LockCache(); + Locker l1 = new Locker("a", 1, lockCache, "L1", 1000, sb); + Locker l2 = new Locker("a", 2, lockCache, "L1", 1000, sb); + new Thread(l1).start(); + // sleep to make sure l1 gets the lock first + Thread.sleep(500); + // L2 should not get lock till l1 releases + new Thread(l2).start(); + while(!l1.isDone() || !l2.isDone()) { + Thread.sleep(100); + } + Assert.assertEquals("a:1-B a:1-L a:2-B a:1-R a:2-L a:2-R", sb.toString().trim()); + } + + @Test + public void testDifferentLocks() throws Exception { + StringBuffer sb = new StringBuffer(""); + LockCache lockCache = new LockCache(); + Locker l1 = new Locker("a", 1, lockCache, "L1", 1000, sb); + Locker l2 = new Locker("a", 2, lockCache, "L2", 1500, sb); + new Thread(l1).start(); + // Sleep for time less than l1's lock timeout + Thread.sleep(500); + // L2 should also get lock + new Thread(l2).start(); + // Wait for this to finish + while(!l1.isDone() || !l2.isDone()) { + Thread.sleep(100); + } + Assert.assertEquals("a:1-B a:1-L a:2-B a:2-L a:1-R a:2-R", sb.toString().trim()); + + } + + class Locker implements Runnable { + protected String name; + private String nameIndex; + private StringBuffer sb; + private LockCache lockCache; + private String resource; + private long timeout; + private volatile boolean done; + + public Locker(String name, int nameIndex, LockCache lockCache, String lockResource, + long timeout, StringBuffer buffer) { + this.name = name; + this.nameIndex = name + ":" + nameIndex; + this.sb = buffer; + this.lockCache = lockCache; + this.resource = lockResource; + this.timeout = timeout; + done = false; + } + + public void run() { + try { + sb.append(nameIndex + "-B "); + synchronized (lockCache.getLock(resource)) { + sb.append(nameIndex + "-L "); + Thread.sleep(timeout); + sb.append(nameIndex + "-R "); + } + done = true; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public boolean isDone() { + return done; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f7f98fe..1e2ae5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.LockCache; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; @@ -141,7 +143,7 @@ public class AssignmentManager { // shutdown processing -- St.Ack // All access to this Map must be synchronized. final NavigableMap regionPlans = - new TreeMap(); + new ConcurrentSkipListMap(); private final TableStateManager tableStateManager; @@ -165,6 +167,8 @@ public class AssignmentManager { // performance, but not needed in most use cases. private final boolean bulkAssignWaitTillAllAssigned; + final private LockCache lockCache = new LockCache(); + /** * Indicator that AssignmentManager has recovered the region states so * that ServerShutdownHandler can be fully enabled and re-assign regions @@ -307,7 +311,7 @@ public class AssignmentManager { * @param plan */ public void addPlan(String encodedName, RegionPlan plan) { - synchronized (regionPlans) { + synchronized (getRegionLock(encodedName)) { regionPlans.put(encodedName, plan); } } @@ -316,8 +320,11 @@ public class AssignmentManager { * Add a map of region plans. */ public void addPlans(Map plans) { - synchronized (regionPlans) { - regionPlans.putAll(plans); + for (Map.Entry planEntry : plans.entrySet()) { + String encodedRegionName = planEntry.getKey(); + synchronized (getLock(planEntry.getValue().getRegionInfo())) { + regionPlans.put(planEntry.getKey(), planEntry.getValue()); + } } } @@ -1208,7 +1215,7 @@ public class AssignmentManager { boolean newPlan = false; RegionPlan existingPlan; - synchronized (this.regionPlans) { + synchronized (getLock(region)) { existingPlan = this.regionPlans.get(encodedName); if (existingPlan != null && existingPlan.getDestination() != null) { @@ -1972,8 +1979,9 @@ public class AssignmentManager { * @param region Region whose plan we are to clear. */ private void clearRegionPlan(final HRegionInfo region) { - synchronized (this.regionPlans) { - this.regionPlans.remove(region.getEncodedName()); + String encodedName = region.getEncodedName(); + synchronized (getLock(region)) { + this.regionPlans.remove(encodedName); } } @@ -2069,13 +2077,13 @@ public class AssignmentManager { */ public List cleanOutCrashedServerReferences(final ServerName sn) { // Clean out any existing assignment plans for this server - synchronized (this.regionPlans) { - for (Iterator > i = this.regionPlans.entrySet().iterator(); - i.hasNext();) { - Map.Entry e = i.next(); - ServerName otherSn = e.getValue().getDestination(); - // The name will be null if the region is planned for a random assign. - if (otherSn != null && otherSn.equals(sn)) { + for (Iterator > i = this.regionPlans.entrySet().iterator(); + i.hasNext();) { + Map.Entry e = i.next(); + ServerName otherSn = e.getValue().getDestination(); + // The name will be null if the region is planned for a random assign. + if (otherSn != null && otherSn.equals(sn)) { + synchronized (getLock(e.getValue().getRegionInfo())) { // Use iterator's remove else we'll get CME i.remove(); } @@ -2138,7 +2146,7 @@ public class AssignmentManager { + (state == null ? "not in region states" : state)); return; } - synchronized (this.regionPlans) { + synchronized (getLock(plan.getRegionInfo())) { this.regionPlans.put(plan.getRegionName(), plan); } unassign(hri, plan.getDestination()); @@ -2901,4 +2909,12 @@ public class AssignmentManager { void setRegionStateListener(RegionStateListener listener) { this.regionStateListener = listener; } + + private Object getRegionLock(String encodedName) { + return lockCache.getLock("r"+encodedName); + } + + private Object getLock(HRegionInfo hri) { + return getRegionLock(hri.getEncodedName()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 64cb77a..5214b5d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -19,18 +19,23 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -46,6 +51,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LockCache; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -62,41 +68,43 @@ public class RegionStates { /** * Regions currently in transition. */ - final HashMap regionsInTransition = - new HashMap(); + final Map regionsInTransition = + new ConcurrentHashMap(); /** * Region encoded name to state map. * All the regions should be in this map. */ private final Map regionStates = - new HashMap(); + new ConcurrentHashMap(); /** - * Holds mapping of table -> region state + * RegionStates sorted by a region's tablename + * Regions of a table are not guaranteed to be sorted + * All the regions in regionStates should be in this map */ - private final Map> regionStatesTableIndex = - new HashMap>(); + private final NavigableMap tableSortedRegionStates = + new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); /** * Server to regions assignment map. * Contains the set of regions currently assigned to a given server. */ private final Map> serverHoldings = - new HashMap>(); + new ConcurrentHashMap>(); /** * Maintains the mapping from the default region to the replica regions. */ private final Map> defaultReplicaToOtherReplicas = - new HashMap>(); + new ConcurrentHashMap>(); /** * Region to server assignment map. * Contains the server a given region is currently assigned to. */ - private final TreeMap regionAssignments = - new TreeMap(); + private final ConcurrentSkipListMap regionAssignments = + new ConcurrentSkipListMap(); /** * Encoded region name to server assignment map for re-assignment @@ -108,8 +116,8 @@ public class RegionStates { * is offline while the info in lastAssignments is cleared when * the region is closed or the server is dead and processed. */ - private final HashMap lastAssignments = - new HashMap(); + private final Map lastAssignments = + new ConcurrentHashMap(); /** * Encoded region name to server assignment map for the @@ -120,16 +128,16 @@ public class RegionStates { * to match the meta. We need this map to find out the old server * whose serverHoldings needs cleanup, given a moved region. */ - private final HashMap oldAssignments = - new HashMap(); + private final Map oldAssignments = + new ConcurrentHashMap(); /** * Map a host port pair string to the latest start code * of a region server which is known to be dead. It is dead * to us, but server manager may not know it yet. */ - private final HashMap deadServers = - new HashMap(); + private final Map deadServers = + new ConcurrentHashMap(); /** * Map a dead servers to the time when log split is done. @@ -138,9 +146,11 @@ public class RegionStates { * on a configured time. By default, we assume a dead * server should be done with log splitting in two hours. */ - private final HashMap processedServers = - new HashMap(); - private long lastProcessedServerCleanTime; + private final Map processedServers = + new ConcurrentHashMap(); + + private final LockCache lockCache = new LockCache(); + private volatile long lastProcessedServerCleanTime; private final TableStateManager tableStateManager; private final RegionStateStore regionStateStore; @@ -162,7 +172,7 @@ public class RegionStates { /** * @return a copy of the region assignment map */ - public synchronized Map getRegionAssignments() { + public Map getRegionAssignments() { return new TreeMap(regionAssignments); } @@ -171,22 +181,24 @@ public class RegionStates { * @param regions * @return a pair containing the groupings as a map */ - synchronized Map> getRegionAssignments( + Map> getRegionAssignments( Collection regions) { Map> map = new HashMap>(); for (HRegionInfo region : regions) { - HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); - Set allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica); - if (allReplicas != null) { - for (HRegionInfo hri : allReplicas) { - ServerName server = regionAssignments.get(hri); - if (server != null) { - List regionsOnServer = map.get(server); - if (regionsOnServer == null) { - regionsOnServer = new ArrayList(1); - map.put(server, regionsOnServer); + synchronized (getLock(region)) { + HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); + Set allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica); + if (allReplicas != null) { + for (HRegionInfo hri : allReplicas) { + ServerName server = regionAssignments.get(hri); + if (server != null) { + List regionsOnServer = map.get(server); + if (regionsOnServer == null) { + regionsOnServer = new ArrayList(1); + map.put(server, regionsOnServer); + } + regionsOnServer.add(hri); } - regionsOnServer.add(hri); } } } @@ -194,7 +206,7 @@ public class RegionStates { return map; } - public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) { + public ServerName getRegionServerOfRegion(HRegionInfo hri) { return regionAssignments.get(hri); } @@ -202,35 +214,35 @@ public class RegionStates { * Get regions in transition and their states */ @SuppressWarnings("unchecked") - public synchronized Map getRegionsInTransition() { - return (Map)regionsInTransition.clone(); + public Map getRegionsInTransition() { + return Collections.unmodifiableMap(regionsInTransition); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final HRegionInfo hri) { + public boolean isRegionInTransition(final HRegionInfo hri) { return regionsInTransition.containsKey(hri.getEncodedName()); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final String encodedName) { + public boolean isRegionInTransition(final String encodedName) { return regionsInTransition.containsKey(encodedName); } /** * @return True if any region in transition. */ - public synchronized boolean isRegionsInTransition() { + public boolean isRegionsInTransition() { return !regionsInTransition.isEmpty(); } /** * @return True if hbase:meta table region is in transition. */ - public synchronized boolean isMetaRegionInTransition() { + public boolean isMetaRegionInTransition() { for (RegionState state : regionsInTransition.values()) { if (state.getRegion().isMetaRegion()) return true; } @@ -240,17 +252,21 @@ public class RegionStates { /** * @return True if specified region assigned, and not in transition. */ - public synchronized boolean isRegionOnline(final HRegionInfo hri) { - return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + public boolean isRegionOnline(final HRegionInfo hri) { + synchronized (getLock(hri)) { + return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + } } /** * @return True if specified region offline/closed, but not in transition. * If the region is not in the map, it is offline to us too. */ - public synchronized boolean isRegionOffline(final HRegionInfo hri) { - return getRegionState(hri) == null || (!isRegionInTransition(hri) - && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + public boolean isRegionOffline(final HRegionInfo hri) { + synchronized (getLock(hri)) { + return getRegionState(hri) == null + || (!isRegionInTransition(hri) && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + } } /** @@ -288,7 +304,7 @@ public class RegionStates { /** * Get region transition state */ - public synchronized RegionState + public RegionState getRegionTransitionState(final String encodedName) { return regionsInTransition.get(encodedName); } @@ -326,56 +342,54 @@ public class RegionStates { * @param lastHost the last server that hosts the region * @return the current state */ - public synchronized RegionState createRegionState(final HRegionInfo hri, + public RegionState createRegionState(final HRegionInfo hri, State newState, ServerName serverName, ServerName lastHost) { - if (newState == null || (newState == State.OPEN && serverName == null)) { - newState = State.OFFLINE; - } - if (hri.isOffline() && hri.isSplit()) { - newState = State.SPLIT; - serverName = null; - } - String encodedName = hri.getEncodedName(); - RegionState regionState = regionStates.get(encodedName); - if (regionState != null) { - LOG.warn("Tried to create a state for a region already in RegionStates, " - + "used existing: " + regionState + ", ignored new: " + newState); - } else { - regionState = new RegionState(hri, newState, serverName); - putRegionState(regionState); - if (newState == State.OPEN) { - if (!serverName.equals(lastHost)) { - LOG.warn("Open region's last host " + lastHost - + " should be the same as the current one " + serverName - + ", ignored the last and used the current one"); - lastHost = serverName; - } - lastAssignments.put(encodedName, lastHost); - regionAssignments.put(hri, lastHost); - } else if (!isOneOfStates(regionState, State.MERGED, State.SPLIT, State.OFFLINE)) { - regionsInTransition.put(encodedName, regionState); + synchronized (getLock(hri)) { + if (newState == null || (newState == State.OPEN && serverName == null)) { + newState = State.OFFLINE; + } + if (hri.isOffline() && hri.isSplit()) { + newState = State.SPLIT; + serverName = null; } - if (lastHost != null && newState != State.SPLIT) { - addToServerHoldings(lastHost, hri); - if (newState != State.OPEN) { - oldAssignments.put(encodedName, lastHost); + String encodedName = hri.getEncodedName(); + RegionState regionState = regionStates.get(encodedName); + if (regionState != null) { + LOG.warn("Tried to create a state for a region already in RegionStates, " + + "used existing: " + regionState + ", ignored new: " + newState); + } else { + regionState = new RegionState(hri, newState, serverName); + putRegionState(regionState); + if (newState == State.OPEN) { + if (!serverName.equals(lastHost)) { + LOG.warn("Open region's last host " + lastHost + + " should be the same as the current one " + serverName + + ", ignored the last and used the current one"); + lastHost = serverName; + } + lastAssignments.put(encodedName, lastHost); + regionAssignments.put(hri, lastHost); + } else if (!isOneOfStates(regionState, State.MERGED, State.SPLIT, State.OFFLINE)) { + regionsInTransition.put(encodedName, regionState); + } + if (lastHost != null && newState != State.SPLIT) { + synchronized (getLock(lastHost)) { + addToServerHoldings(lastHost, hri); + if (newState != State.OPEN) { + oldAssignments.put(encodedName, lastHost); + } + } } } + return regionState; } - return regionState; } private RegionState putRegionState(RegionState regionState) { HRegionInfo hri = regionState.getRegion(); String encodedName = hri.getEncodedName(); - TableName table = hri.getTable(); RegionState oldState = regionStates.put(encodedName, regionState); - Map map = regionStatesTableIndex.get(table); - if (map == null) { - map = new HashMap(); - regionStatesTableIndex.put(table, map); - } - map.put(encodedName, regionState); + tableSortedRegionStates.put(regionState.getRegion().getRegionName(), regionState); return oldState; } @@ -384,9 +398,9 @@ public class RegionStates { */ public RegionState updateRegionState( final HRegionInfo hri, final State state) { - RegionState regionState = getRegionState(hri.getEncodedName()); + RegionState regionState = regionStates.get(hri.getEncodedName()); return updateRegionState(hri, state, - regionState == null ? null : regionState.getServerName()); + regionState == null ? null : regionState.getServerName()); } /** @@ -418,16 +432,16 @@ public class RegionStates { } updateRegionState(hri, State.OPEN, serverName, openSeqNum); - synchronized (this) { + synchronized (getLock(hri)) { regionsInTransition.remove(encodedName); ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { if (LOG.isDebugEnabled()) { LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); - } else { - LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); } - addToServerHoldings(serverName, hri); + synchronized (getLock(serverName)) { + addToServerHoldings(serverName, hri); + } addToReplicaMapping(hri); if (oldServerName == null) { oldServerName = oldAssignments.remove(encodedName); @@ -435,8 +449,10 @@ public class RegionStates { if (oldServerName != null && !oldServerName.equals(serverName) && serverHoldings.containsKey(oldServerName)) { - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - removeFromServerHoldings(oldServerName, hri); + synchronized (getLock(oldServerName)) { + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + removeFromServerHoldings(oldServerName, hri); + } } } } @@ -445,7 +461,7 @@ public class RegionStates { private void addToServerHoldings(ServerName serverName, HRegionInfo hri) { Set regions = serverHoldings.get(serverName); if (regions == null) { - regions = new HashSet(); + regions = Sets.newSetFromMap(new ConcurrentHashMap()); serverHoldings.put(serverName, regions); } regions.add(hri); @@ -453,30 +469,36 @@ public class RegionStates { private void addToReplicaMapping(HRegionInfo hri) { HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); - Set replicas = - defaultReplicaToOtherReplicas.get(defaultReplica); - if (replicas == null) { - replicas = new HashSet(); - defaultReplicaToOtherReplicas.put(defaultReplica, replicas); + synchronized (getLock(defaultReplica)) { + Set replicas = + defaultReplicaToOtherReplicas.get(defaultReplica); + if (replicas == null) { + replicas = Sets.newSetFromMap(new ConcurrentHashMap()); + defaultReplicaToOtherReplicas.put(defaultReplica, replicas); + } + replicas.add(hri); } - replicas.add(hri); } private void removeFromServerHoldings(ServerName serverName, HRegionInfo hri) { Set oldRegions = serverHoldings.get(serverName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(serverName); + if (oldRegions != null) { + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(serverName); + } } } private void removeFromReplicaMapping(HRegionInfo hri) { HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); - Set replicas = defaultReplicaToOtherReplicas.get(defaultReplica); - if (replicas != null) { - replicas.remove(hri); - if (replicas.isEmpty()) { - defaultReplicaToOtherReplicas.remove(defaultReplica); + synchronized (getLock(defaultReplica)) { + Set replicas = defaultReplicaToOtherReplicas.get(defaultReplica); + if (replicas != null) { + replicas.remove(hri); + if (replicas.isEmpty()) { + defaultReplicaToOtherReplicas.remove(defaultReplica); + } } } } @@ -485,15 +507,19 @@ public class RegionStates { * A dead server's wals have been split so that all the regions * used to be open on it can be safely assigned now. Mark them assignable. */ - public synchronized void logSplit(final ServerName serverName) { + public void logSplit(final ServerName serverName) { for (Iterator> it = lastAssignments.entrySet().iterator(); it.hasNext();) { Map.Entry e = it.next(); if (e.getValue().equals(serverName)) { - it.remove(); + synchronized(getRegionLock(e.getKey())) { + it.remove(); + } } } long now = System.currentTimeMillis(); + // This method is called once per server, it should be ok not to use + // putIfAbsent as entries from the map are lazily removed if (LOG.isDebugEnabled()) { LOG.debug("Adding to log splitting servers " + serverName); } @@ -524,8 +550,10 @@ public class RegionStates { clearLastAssignment(region); } - public synchronized void clearLastAssignment(final HRegionInfo region) { - lastAssignments.remove(region.getEncodedName()); + public void clearLastAssignment(final HRegionInfo region) { + synchronized (getLock(region)) { + lastAssignments.remove(region.getEncodedName()); + } } /** @@ -554,17 +582,19 @@ public class RegionStates { expectedState == null ? State.OFFLINE : expectedState; updateRegionState(hri, newState); String encodedName = hri.getEncodedName(); - synchronized (this) { + synchronized (getLock(hri)) { regionsInTransition.remove(encodedName); ServerName oldServerName = regionAssignments.remove(hri); if (oldServerName != null && serverHoldings.containsKey(oldServerName)) { if (newState == State.MERGED || newState == State.SPLIT || hri.isMetaRegion() || tableStateManager.isTableState(hri.getTable(), - TableState.State.DISABLED, TableState.State.DISABLING)) { + TableState.State.DISABLED, TableState.State.DISABLING)) { // Offline the region only if it's merged/split, or the table is disabled/disabling. // Otherwise, offline it from this server only when it is online on a different server. - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - removeFromServerHoldings(oldServerName, hri); + synchronized (getLock(oldServerName)) { + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + removeFromServerHoldings(oldServerName, hri); + } removeFromReplicaMapping(hri); } else { // Need to remember it so that we can offline it from this @@ -586,14 +616,14 @@ public class RegionStates { // ConcurrentModificationException and deadlock in case of meta anassigned, // but RegionState a blocked. Set regionsToOffline = new HashSet(); - synchronized (this) { - Set assignedRegions = serverHoldings.get(sn); - if (assignedRegions == null) { - assignedRegions = new HashSet(); - } + Set assignedRegions = serverHoldings.get(sn); + if (assignedRegions == null) { + assignedRegions = Sets.newSetFromMap(new ConcurrentHashMap()); + } - for (HRegionInfo region : assignedRegions) { - // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE + for (HRegionInfo region : assignedRegions) { + // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE + synchronized (region.getEncodedName()) { if (isRegionOnline(region)) { regionsToOffline.add(region); } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) { @@ -601,41 +631,44 @@ public class RegionStates { regionsToOffline.add(region); } } + } - for (RegionState state : regionsInTransition.values()) { - HRegionInfo hri = state.getRegion(); - if (assignedRegions.contains(hri)) { - // Region is open on this region server, but in transition. - // This region must be moving away from this server, or splitting/merging. - // SSH will handle it, either skip assigning, or re-assign. - LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn); - } else if (sn.equals(state.getServerName())) { - // Region is in transition on this region server, and this - // region is not open on this server. So the region must be - // moving to this server from another one (i.e. opening or - // pending open on this server, was open on another one. - // Offline state is also kind of pending open if the region is in - // transition. The region could be in failed_close state too if we have - // tried several times to open it while this region server is not reachable) - if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN, - State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) { - LOG.info("Found region in " + state + - " to be reassigned by ServerCrashProcedure for " + sn); - rits.add(hri); - } else if (isOneOfStates(state, State.SPLITTING_NEW)) { - regionsToCleanIfNoMetaEntry.add(state.getRegion()); - } else { - LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state); - } + for (RegionState state : regionsInTransition.values()) { + HRegionInfo hri = state.getRegion(); + if (assignedRegions.contains(hri)) { + // Region is open on this region server, but in transition. + // This region must be moving away from this server, or splitting/merging. + // SSH will handle it, either skip assigning, or re-assign. + LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn); + } else if (sn.equals(state.getServerName())) { + // Region is in transition on this region server, and this + // region is not open on this server. So the region must be + // moving to this server from another one (i.e. opening or + // pending open on this server, was open on another one. + // Offline state is also kind of pending open if the region is in + // transition. The region could be in failed_close state too if we have + // tried several times to open it while this region server is not reachable) + if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN, + State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) { + LOG.info("Found region in " + state + + " to be reassigned by ServerCrashProcedure for " + sn); + rits.add(hri); + } else if (isOneOfStates(state, State.SPLITTING_NEW)) { + regionsToCleanIfNoMetaEntry.add(state.getRegion()); + } else { + LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state); } } - this.notifyAll(); } for (HRegionInfo hri : regionsToOffline) { regionOffline(hri); } + synchronized (this) { + this.notifyAll(); + } + cleanIfNoMetaEntry(regionsToCleanIfNoMetaEntry); return rits; } @@ -672,7 +705,7 @@ public class RegionStates { * @param tableName * @return Online regions from tableName */ - public synchronized List getRegionsOfTable(TableName tableName) { + public List getRegionsOfTable(TableName tableName) { List tableRegions = new ArrayList(); // boundary needs to have table's name but regionID 0 so that it is sorted // before all table's regions. @@ -693,18 +726,23 @@ public class RegionStates { * @param tableName * @return Online regions from tableName */ - public synchronized Map> - getRegionByStateOfTable(TableName tableName) { + public Map> getRegionByStateOfTable(TableName tableName) { Map> tableRegions = new HashMap>(); for (State state : State.values()) { tableRegions.put(state, new ArrayList()); } - Map indexMap = regionStatesTableIndex.get(tableName); - if (indexMap == null) - return tableRegions; - for (RegionState regionState : indexMap.values()) { - tableRegions.get(regionState.getState()).add(regionState.getRegion()); + byte[] startKey = new byte[tableName.getName().length+1]; + System.arraycopy(tableName.getName(), 0, startKey, 0, tableName.getName().length); + startKey[startKey.length-1] = (byte)HConstants.DELIMITER; + + byte[] endKey = new byte[startKey.length+1]; + System.arraycopy(startKey, 0 , endKey, 0, startKey.length); + endKey[endKey.length-1] = (byte)0xFF; + + for (RegionState state : tableSortedRegionStates.subMap( + startKey, endKey).values()) { + tableRegions.get(state.getState()).add(state.getRegion()); } return tableRegions; } @@ -715,7 +753,7 @@ public class RegionStates { * If the region isn't in transition, returns immediately. Otherwise, method * blocks until the region is out of transition. */ - public synchronized void waitOnRegionToClearRegionsInTransition( + public void waitOnRegionToClearRegionsInTransition( final HRegionInfo hri) throws InterruptedException { if (!isRegionInTransition(hri)) return; @@ -736,24 +774,18 @@ public class RegionStates { * We loop through all regions assuming we don't delete tables too much. */ public void tableDeleted(final TableName tableName) { - Set regionsToDelete = new HashSet(); - synchronized (this) { - for (RegionState state: regionStates.values()) { - HRegionInfo region = state.getRegion(); - if (region.getTable().equals(tableName)) { - regionsToDelete.add(region); - } + for (RegionState state: regionStates.values()) { + HRegionInfo region = state.getRegion(); + if (region.getTable().equals(tableName)) { + deleteRegion(region); } } - for (HRegionInfo region: regionsToDelete) { - deleteRegion(region); - } } /** * Get a copy of all regions assigned to a server */ - public synchronized Set getServerRegions(ServerName serverName) { + public Set getServerRegions(ServerName serverName) { Set regions = serverHoldings.get(serverName); if (regions == null) return null; return new HashSet(regions); @@ -763,20 +795,20 @@ public class RegionStates { * Remove a region from all state maps. */ @VisibleForTesting - public synchronized void deleteRegion(final HRegionInfo hri) { + public void deleteRegion(final HRegionInfo hri) { String encodedName = hri.getEncodedName(); - regionsInTransition.remove(encodedName); - regionStates.remove(encodedName); - TableName table = hri.getTable(); - Map indexMap = regionStatesTableIndex.get(table); - indexMap.remove(encodedName); - if (indexMap.size() == 0) - regionStatesTableIndex.remove(table); - lastAssignments.remove(encodedName); - ServerName sn = regionAssignments.remove(hri); - if (sn != null) { - Set regions = serverHoldings.get(sn); - regions.remove(hri); + synchronized (getLock(hri)) { + regionsInTransition.remove(encodedName); + regionStates.remove(encodedName); + tableSortedRegionStates.remove(hri.getRegionName()); + lastAssignments.remove(encodedName); + ServerName sn = regionAssignments.remove(hri); + if (sn != null) { + synchronized (getLock(sn)) { + Set regions = serverHoldings.get(sn); + regions.remove(hri); + } + } } } @@ -790,12 +822,12 @@ public class RegionStates { * think it's online falsely. Therefore if a server is online, we still * need to confirm it reachable and having the expected start code. */ - synchronized boolean wasRegionOnDeadServer(final String encodedName) { + boolean wasRegionOnDeadServer(final String encodedName) { ServerName server = lastAssignments.get(encodedName); return isServerDeadAndNotProcessed(server); } - synchronized boolean isServerDeadAndNotProcessed(ServerName server) { + boolean isServerDeadAndNotProcessed(ServerName server) { if (server == null) return false; if (serverManager.isServerOnline(server)) { String hostAndPort = server.getHostAndPort(); @@ -806,6 +838,8 @@ public class RegionStates { return false; } // The size of deadServers won't grow unbounded. + // Avoid putIfAbsent as it should be ok for the startCode to be overwritten for + // a given hostAndPort deadServers.put(hostAndPort, Long.valueOf(startCode)); } // Watch out! If the server is not dead, the region could @@ -825,23 +859,25 @@ public class RegionStates { * Get the last region server a region was on for purpose of re-assignment, * i.e. should the re-assignment be held back till log split is done? */ - synchronized ServerName getLastRegionServerOfRegion(final String encodedName) { + ServerName getLastRegionServerOfRegion(final String encodedName) { return lastAssignments.get(encodedName); } - synchronized void setLastRegionServerOfRegions( + void setLastRegionServerOfRegions( final ServerName serverName, final List regionInfos) { for (HRegionInfo hri: regionInfos) { setLastRegionServerOfRegion(serverName, hri.getEncodedName()); } } - synchronized void setLastRegionServerOfRegion( + void setLastRegionServerOfRegion( final ServerName serverName, final String encodedName) { - lastAssignments.put(encodedName, serverName); + synchronized (getRegionLock(encodedName)) { + lastAssignments.put(encodedName, serverName); + } } - synchronized boolean isRegionOnServer( + boolean isRegionOnServer( final HRegionInfo hri, final ServerName serverName) { Set regions = serverHoldings.get(serverName); return regions == null ? false : regions.contains(hri); @@ -851,32 +887,44 @@ public class RegionStates { HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { regionStateStore.splitRegion(p, a, b, sn, getRegionReplication(p)); - synchronized (this) { - // After PONR, split is considered to be done. - // Update server holdings to be aligned with the meta. - Set regions = serverHoldings.get(sn); - if (regions == null) { - throw new IllegalStateException(sn + " should host some regions"); + synchronized (getLock(p)) { + synchronized (getLock(a)) { + synchronized (getLock(b)) { + synchronized (getLock(sn)) { + // After PONR, split is considered to be done. + // Update server holdings to be aligned with the meta. + Set regions = serverHoldings.get(sn); + if (regions == null) { + throw new IllegalStateException(sn + " should host some regions"); + } + regions.remove(p); + regions.add(a); + regions.add(b); + } + } } - regions.remove(p); - regions.add(a); - regions.add(b); } } void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { regionStateStore.mergeRegions(p, a, b, sn, getRegionReplication(a)); - synchronized (this) { - // After PONR, merge is considered to be done. - // Update server holdings to be aligned with the meta. - Set regions = serverHoldings.get(sn); - if (regions == null) { - throw new IllegalStateException(sn + " should host some regions"); + synchronized (getLock(p)) { + synchronized (getLock(a)) { + synchronized (getLock(b)) { + synchronized (getLock(sn)) { + // After PONR, merge is considered to be done. + // Update server holdings to be aligned with the meta. + Set regions = serverHoldings.get(sn); + if (regions == null) { + throw new IllegalStateException(sn + " should host some regions"); + } + regions.remove(a); + regions.remove(b); + regions.add(p); + } + } } - regions.remove(a); - regions.remove(b); - regions.add(p); } } @@ -895,7 +943,7 @@ public class RegionStates { * that are excluded, such as disabled/disabling/enabling tables. All user regions * and their previous locations are returned. */ - synchronized Map closeAllUserRegions(Set excludedTables) { + Map closeAllUserRegions(Set excludedTables) { boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty(); Set toBeClosed = new HashSet(regionStates.size()); for(RegionState state: regionStates.values()) { @@ -924,7 +972,7 @@ public class RegionStates { * regions being served, ignoring stats about number of requests. * @return the average load */ - protected synchronized double getAverageLoad() { + protected double getAverageLoad() { int numServers = 0, totalLoad = 0; for (Map.Entry> e: serverHoldings.entrySet()) { Set regions = e.getValue(); @@ -961,32 +1009,30 @@ public class RegionStates { getAssignmentsByTable() { Map>> result = new HashMap>>(); - synchronized (this) { - if (!server.getConfiguration().getBoolean( - HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) { - Map> svrToRegions = - new HashMap>(serverHoldings.size()); - for (Map.Entry> e: serverHoldings.entrySet()) { - svrToRegions.put(e.getKey(), new ArrayList(e.getValue())); - } - result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions); - } else { - for (Map.Entry> e: serverHoldings.entrySet()) { - for (HRegionInfo hri: e.getValue()) { - if (hri.isMetaRegion()) continue; - TableName tablename = hri.getTable(); - Map> svrToRegions = result.get(tablename); - if (svrToRegions == null) { - svrToRegions = new HashMap>(serverHoldings.size()); - result.put(tablename, svrToRegions); - } - List regions = svrToRegions.get(e.getKey()); - if (regions == null) { - regions = new ArrayList(); - svrToRegions.put(e.getKey(), regions); - } - regions.add(hri); + if (!server.getConfiguration().getBoolean( + HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) { + Map> svrToRegions = + new HashMap>(serverHoldings.size()); + for (Map.Entry> e: serverHoldings.entrySet()) { + svrToRegions.put(e.getKey(), new ArrayList(e.getValue())); + } + result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions); + } else { + for (Map.Entry> e: serverHoldings.entrySet()) { + for (HRegionInfo hri: e.getValue()) { + if (hri.isMetaRegion()) continue; + TableName tablename = hri.getTable(); + Map> svrToRegions = result.get(tablename); + if (svrToRegions == null) { + svrToRegions = new HashMap>(serverHoldings.size()); + result.put(tablename, svrToRegions); } + List regions = svrToRegions.get(e.getKey()); + if (regions == null) { + regions = new ArrayList(); + svrToRegions.put(e.getKey(), regions); + } + regions.add(hri); } } } @@ -1014,7 +1060,7 @@ public class RegionStates { * Returns a clone of region assignments per server * @return a Map of ServerName to a List of HRegionInfo's */ - protected synchronized Map> getRegionAssignmentsByServer() { + protected Map> getRegionAssignmentsByServer() { Map> regionsByServer = new HashMap>(serverHoldings.size()); for (Map.Entry> e: serverHoldings.entrySet()) { @@ -1023,7 +1069,7 @@ public class RegionStates { return regionsByServer; } - protected synchronized RegionState getRegionState(final String encodedName) { + protected RegionState getRegionState(final String encodedName) { return regionStates.get(encodedName); } @@ -1084,7 +1130,7 @@ public class RegionStates { regionStateStore.updateRegionState(openSeqNum, regionState, oldState); } - synchronized (this) { + synchronized (getLock(hri)) { regionsInTransition.put(encodedName, regionState); putRegionState(regionState); @@ -1113,10 +1159,24 @@ public class RegionStates { } } } + } + synchronized (this) { // notify the change this.notifyAll(); } return regionState; } + + private Object getRegionLock(String encodedName) { + return lockCache.getLock("r"+encodedName); + } + + private Object getLock(HRegionInfo hri) { + return getRegionLock(hri.getEncodedName()); + } + + private Object getLock(ServerName serverName) { + return lockCache.getLock("s"+serverName.getServerName()); + } }