diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 6223c65..bc7d1b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -98,6 +99,7 @@ import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.LockCache; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; @@ -184,9 +186,8 @@ public class AssignmentManager extends ZooKeeperListener { /** Plans for region movement. Key is the encoded version of a region name*/ // TODO: When do plans get cleaned out? Ever? In server open and in server // shutdown processing -- St.Ack - // All access to this Map must be synchronized. final NavigableMap regionPlans = - new TreeMap(); + new ConcurrentSkipListMap(); private final TableStateManager tableStateManager; @@ -222,6 +223,8 @@ public class AssignmentManager extends ZooKeeperListener { // performance, but not needed in most use cases. private final boolean bulkAssignWaitTillAllAssigned; + final private LockCache lockCache = new LockCache(); + /** * Indicator that AssignmentManager has recovered the region states so * that ServerShutdownHandler can be fully enabled and re-assign regions @@ -384,7 +387,7 @@ public class AssignmentManager extends ZooKeeperListener { * @param plan */ public void addPlan(String encodedName, RegionPlan plan) { - synchronized (regionPlans) { + synchronized (getRegionLock(encodedName)) { regionPlans.put(encodedName, plan); } } @@ -393,8 +396,10 @@ public class AssignmentManager extends ZooKeeperListener { * Add a map of region plans. */ public void addPlans(Map plans) { - synchronized (regionPlans) { - regionPlans.putAll(plans); + for (Map.Entry planEntry : plans.entrySet()) { + synchronized (getLock(planEntry.getValue().getRegionInfo())) { + regionPlans.put(planEntry.getKey(), planEntry.getValue()); + } } } @@ -1356,22 +1361,26 @@ public class AssignmentManager extends ZooKeeperListener { ServerName serverName = rs.getServerName(); if (serverManager.isServerOnline(serverName)) { if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) { - synchronized (regionStates) { + synchronized (regionStates.getLock(regionInfo)) { regionOnline(regionInfo, serverName); if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) { // Check if the daugter regions are still there, if they are present, offline // as its the case of a rollback. HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst(); HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond(); - if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) { - LOG.warn("Split daughter region not in transition " + hri_a); - } - if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) { - LOG.warn("Split daughter region not in transition" + hri_b); + synchronized (regionStates.getLock(hri_a)) { + synchronized (regionStates.getLock(hri_b)) { + if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) { + LOG.warn("Split daughter region not in transition " + hri_a); + } + if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) { + LOG.warn("Split daughter region not in transition" + hri_b); + } + regionOffline(hri_a); + regionOffline(hri_b); + splitRegions.remove(regionInfo); + } } - regionOffline(hri_a); - regionOffline(hri_b); - splitRegions.remove(regionInfo); } if (disabled) { // if server is offline, no hurt to unassign again @@ -1381,12 +1390,16 @@ public class AssignmentManager extends ZooKeeperListener { } } } else if (rs.isMergingNew()) { - synchronized (regionStates) { - String p = regionInfo.getEncodedName(); + String p = regionInfo.getEncodedName(); + synchronized (regionStates.getLock(regionInfo)) { PairOfSameType regions = mergingRegions.get(p); - if (regions != null) { - onlineMergingRegion(disabled, regions.getFirst(), serverName); - onlineMergingRegion(disabled, regions.getSecond(), serverName); + synchronized (regionStates.getLock(regions.getFirst())) { + synchronized (regionStates.getLock(regions.getSecond())) { + if (regions != null) { + onlineMergingRegion(disabled, regions.getFirst(), serverName); + onlineMergingRegion(disabled, regions.getSecond(), serverName); + } + } } } } @@ -2421,7 +2434,7 @@ public class AssignmentManager extends ZooKeeperListener { boolean newPlan = false; RegionPlan existingPlan; - synchronized (this.regionPlans) { + synchronized (getLock(region)) { existingPlan = this.regionPlans.get(encodedName); if (existingPlan != null && existingPlan.getDestination() != null) { @@ -2444,7 +2457,7 @@ public class AssignmentManager extends ZooKeeperListener { LOG.warn("Can't find a destination for " + encodedName); return null; } - synchronized (this.regionPlans) { + synchronized (getLock(region)) { randomPlan = new RegionPlan(region, null, destination); if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) { List regions = new ArrayList(1); @@ -3330,7 +3343,7 @@ public class AssignmentManager extends ZooKeeperListener { * @param region Region whose plan we are to clear. */ void clearRegionPlan(final HRegionInfo region) { - synchronized (this.regionPlans) { + synchronized (getLock(region)) { this.regionPlans.remove(region.getEncodedName()); } } @@ -3468,13 +3481,13 @@ public class AssignmentManager extends ZooKeeperListener { */ public List cleanOutCrashedServerReferences(final ServerName sn) { // Clean out any existing assignment plans for this server - synchronized (this.regionPlans) { - for (Iterator > i = this.regionPlans.entrySet().iterator(); - i.hasNext();) { - Map.Entry e = i.next(); - ServerName otherSn = e.getValue().getDestination(); - // The name will be null if the region is planned for a random assign. - if (otherSn != null && otherSn.equals(sn)) { + for (Iterator > i = this.regionPlans.entrySet().iterator(); + i.hasNext();) { + Map.Entry e = i.next(); + ServerName otherSn = e.getValue().getDestination(); + // The name will be null if the region is planned for a random assign. + if (otherSn != null && otherSn.equals(sn)) { + synchronized (getLock(e.getValue().getRegionInfo())) { // Use iterator's remove else we'll get CME i.remove(); } @@ -3543,7 +3556,7 @@ public class AssignmentManager extends ZooKeeperListener { + (state == null ? "not in region states" : state)); return; } - synchronized (this.regionPlans) { + synchronized (getLock(plan.getRegionInfo())) { this.regionPlans.put(plan.getRegionName(), plan); } unassign(hri, false, plan.getDestination()); @@ -3931,19 +3944,23 @@ public class AssignmentManager extends ZooKeeperListener { } } - synchronized (regionStates) { - regionStates.updateRegionState(hri_a, State.MERGING); - regionStates.updateRegionState(hri_b, State.MERGING); - regionStates.updateRegionState(p, State.MERGING_NEW, sn); + synchronized (regionStates.getLock(p)) { + synchronized (regionStates.getLock(hri_a)) { + synchronized (regionStates.getLock(hri_b)) { + regionStates.updateRegionState(hri_a, State.MERGING); + regionStates.updateRegionState(hri_b, State.MERGING); + regionStates.updateRegionState(p, State.MERGING_NEW, sn); - if (et != EventType.RS_ZK_REGION_MERGED) { - this.mergingRegions.put(encodedName, - new PairOfSameType(hri_a, hri_b)); - } else { - this.mergingRegions.remove(encodedName); - regionOffline(hri_a, State.MERGED); - regionOffline(hri_b, State.MERGED); - regionOnline(p, sn); + if (et != EventType.RS_ZK_REGION_MERGED) { + this.mergingRegions.put(encodedName, + new PairOfSameType(hri_a, hri_b)); + } else { + this.mergingRegions.remove(encodedName); + regionOffline(hri_a, State.MERGED); + regionOffline(hri_b, State.MERGED); + regionOnline(p, sn); + } + } } } @@ -4055,24 +4072,28 @@ public class AssignmentManager extends ZooKeeperListener { } } - synchronized (regionStates) { - splitRegions.put(p, new PairOfSameType(hri_a, hri_b)); - regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn); - regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn); - regionStates.updateRegionState(rt, State.SPLITTING); + synchronized (regionStates.getLock(p)) { + synchronized (regionStates.getLock(hri_a)) { + synchronized (regionStates.getLock(hri_b)) { + splitRegions.put(p, new PairOfSameType(hri_a, hri_b)); + regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn); + regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn); + regionStates.updateRegionState(rt, State.SPLITTING); - // The below is for testing ONLY! We can't do fault injection easily, so - // resort to this kinda uglyness -- St.Ack 02/25/2011. - if (TEST_SKIP_SPLIT_HANDLING) { - LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"); - return true; // return true so that the splitting node stays - } + // The below is for testing ONLY! We can't do fault injection easily, so + // resort to this kinda uglyness -- St.Ack 02/25/2011. + if (TEST_SKIP_SPLIT_HANDLING) { + LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"); + return true; // return true so that the splitting node stays + } - if (et == EventType.RS_ZK_REGION_SPLIT) { - regionOffline(p, State.SPLIT); - regionOnline(hri_a, sn); - regionOnline(hri_b, sn); - splitRegions.remove(p); + if (et == EventType.RS_ZK_REGION_SPLIT) { + regionOffline(p, State.SPLIT); + regionOnline(hri_a, sn); + regionOnline(hri_b, sn); + splitRegions.remove(p); + } + } } } @@ -4436,4 +4457,12 @@ public class AssignmentManager extends ZooKeeperListener { void setRegionStateListener(RegionStateListener listener) { this.regionStateListener = listener; } + + private Object getRegionLock(String encodedName) { + return lockCache.getLock("r"+encodedName); + } + + private Object getLock(HRegionInfo hri) { + return getRegionLock(hri.getEncodedName()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 17a661c..b14f144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -26,9 +26,14 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -47,6 +52,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LockCache; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -69,41 +75,43 @@ public class RegionStates { /** * Regions currently in transition. */ - final HashMap regionsInTransition = - new HashMap(); + final Map regionsInTransition = + new ConcurrentHashMap(); /** * Region encoded name to state map. * All the regions should be in this map. */ private final Map regionStates = - new HashMap(); + new ConcurrentHashMap(); /** - * Holds mapping of table -> region state + * RegionStates sorted by a region's tablename + * Regions of a table are not guaranteed to be sorted + * All the regions in regionStates should be in this map */ - private final Map> regionStatesTableIndex = - new HashMap>(); + private final NavigableMap tableSortedRegionStates = + new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); /** * Server to regions assignment map. * Contains the set of regions currently assigned to a given server. */ private final Map> serverHoldings = - new HashMap>(); + new ConcurrentHashMap>(); /** * Maintains the mapping from the default region to the replica regions. */ private final Map> defaultReplicaToOtherReplicas = - new HashMap>(); + new ConcurrentHashMap>(); /** * Region to server assignment map. * Contains the server a given region is currently assigned to. */ - private final TreeMap regionAssignments = - new TreeMap(); + private final ConcurrentSkipListMap regionAssignments = + new ConcurrentSkipListMap(); /** * Encoded region name to server assignment map for re-assignment @@ -115,8 +123,8 @@ public class RegionStates { * is offline while the info in lastAssignments is cleared when * the region is closed or the server is dead and processed. */ - private final HashMap lastAssignments = - new HashMap(); + private final Map lastAssignments = + new ConcurrentHashMap(); /** * Encoded region name to server assignment map for the @@ -127,16 +135,16 @@ public class RegionStates { * to match the meta. We need this map to find out the old server * whose serverHoldings needs cleanup, given a moved region. */ - private final HashMap oldAssignments = - new HashMap(); + private final Map oldAssignments = + new ConcurrentHashMap(); /** * Map a host port pair string to the latest start code * of a region server which is known to be dead. It is dead * to us, but server manager may not know it yet. */ - private final HashMap deadServers = - new HashMap(); + private final Map deadServers = + new ConcurrentHashMap(); /** * Map a dead servers to the time when log split is done. @@ -145,9 +153,11 @@ public class RegionStates { * on a configured time. By default, we assume a dead * server should be done with log splitting in two hours. */ - private final HashMap processedServers = - new HashMap(); - private long lastProcessedServerCleanTime; + private final Map processedServers = + new ConcurrentHashMap(); + + private final LockCache lockCache = new LockCache(); + private volatile long lastProcessedServerCleanTime; private final TableStateManager tableStateManager; private final RegionStateStore regionStateStore; @@ -169,7 +179,7 @@ public class RegionStates { /** * @return a copy of the region assignment map */ - public synchronized Map getRegionAssignments() { + public Map getRegionAssignments() { return new TreeMap(regionAssignments); } @@ -178,22 +188,24 @@ public class RegionStates { * @param regions * @return a pair containing the groupings as a map */ - synchronized Map> getRegionAssignments( + Map> getRegionAssignments( Collection regions) { Map> map = new HashMap>(); for (HRegionInfo region : regions) { - HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); - Set allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica); - if (allReplicas != null) { - for (HRegionInfo hri : allReplicas) { - ServerName server = regionAssignments.get(hri); - if (server != null) { - List regionsOnServer = map.get(server); - if (regionsOnServer == null) { - regionsOnServer = new ArrayList(1); - map.put(server, regionsOnServer); + synchronized (getLock(region)) { + HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); + Set allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica); + if (allReplicas != null) { + for (HRegionInfo hri : allReplicas) { + ServerName server = regionAssignments.get(hri); + if (server != null) { + List regionsOnServer = map.get(server); + if (regionsOnServer == null) { + regionsOnServer = new ArrayList(1); + map.put(server, regionsOnServer); + } + regionsOnServer.add(hri); } - regionsOnServer.add(hri); } } } @@ -201,7 +213,7 @@ public class RegionStates { return map; } - public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) { + public ServerName getRegionServerOfRegion(HRegionInfo hri) { return regionAssignments.get(hri); } @@ -209,35 +221,35 @@ public class RegionStates { * Get regions in transition and their states */ @SuppressWarnings("unchecked") - public synchronized Map getRegionsInTransition() { - return (Map)regionsInTransition.clone(); + public Map getRegionsInTransition() { + return Maps.newHashMap(regionsInTransition); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final HRegionInfo hri) { + public boolean isRegionInTransition(final HRegionInfo hri) { return regionsInTransition.containsKey(hri.getEncodedName()); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final String encodedName) { + public boolean isRegionInTransition(final String encodedName) { return regionsInTransition.containsKey(encodedName); } /** * @return True if any region in transition. */ - public synchronized boolean isRegionsInTransition() { + public boolean isRegionsInTransition() { return !regionsInTransition.isEmpty(); } /** * @return True if hbase:meta table region is in transition. */ - public synchronized boolean isMetaRegionInTransition() { + public boolean isMetaRegionInTransition() { for (RegionState state : regionsInTransition.values()) { if (state.getRegion().isMetaRegion()) return true; } @@ -247,17 +259,21 @@ public class RegionStates { /** * @return True if specified region assigned, and not in transition. */ - public synchronized boolean isRegionOnline(final HRegionInfo hri) { - return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + public boolean isRegionOnline(final HRegionInfo hri) { + synchronized (getLock(hri)) { + return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + } } /** * @return True if specified region offline/closed, but not in transition. * If the region is not in the map, it is offline to us too. */ - public synchronized boolean isRegionOffline(final HRegionInfo hri) { - return getRegionState(hri) == null || (!isRegionInTransition(hri) - && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + public boolean isRegionOffline(final HRegionInfo hri) { + synchronized (getLock(hri)) { + return getRegionState(hri) == null + || (!isRegionInTransition(hri) && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + } } /** @@ -295,7 +311,7 @@ public class RegionStates { /** * Get region transition state */ - public synchronized RegionState + public RegionState getRegionTransitionState(final String encodedName) { return regionsInTransition.get(encodedName); } @@ -333,56 +349,54 @@ public class RegionStates { * @param lastHost the last server that hosts the region * @return the current state */ - public synchronized RegionState createRegionState(final HRegionInfo hri, + public RegionState createRegionState(final HRegionInfo hri, State newState, ServerName serverName, ServerName lastHost) { - if (newState == null || (newState == State.OPEN && serverName == null)) { - newState = State.OFFLINE; - } - if (hri.isOffline() && hri.isSplit()) { - newState = State.SPLIT; - serverName = null; - } - String encodedName = hri.getEncodedName(); - RegionState regionState = regionStates.get(encodedName); - if (regionState != null) { - LOG.warn("Tried to create a state for a region already in RegionStates, " - + "used existing: " + regionState + ", ignored new: " + newState); - } else { - regionState = new RegionState(hri, newState, serverName); - putRegionState(regionState); - if (newState == State.OPEN) { - if (!serverName.equals(lastHost)) { - LOG.warn("Open region's last host " + lastHost - + " should be the same as the current one " + serverName - + ", ignored the last and used the current one"); - lastHost = serverName; - } - lastAssignments.put(encodedName, lastHost); - regionAssignments.put(hri, lastHost); - } else if (!regionState.isUnassignable()) { - regionsInTransition.put(encodedName, regionState); + synchronized (getLock(hri)) { + if (newState == null || (newState == State.OPEN && serverName == null)) { + newState = State.OFFLINE; } - if (lastHost != null && newState != State.SPLIT) { - addToServerHoldings(lastHost, hri); - if (newState != State.OPEN) { - oldAssignments.put(encodedName, lastHost); + if (hri.isOffline() && hri.isSplit()) { + newState = State.SPLIT; + serverName = null; + } + String encodedName = hri.getEncodedName(); + RegionState regionState = regionStates.get(encodedName); + if (regionState != null) { + LOG.warn("Tried to create a state for a region already in RegionStates, " + + "used existing: " + regionState + ", ignored new: " + newState); + } else { + regionState = new RegionState(hri, newState, serverName); + putRegionState(regionState); + if (newState == State.OPEN) { + if (!serverName.equals(lastHost)) { + LOG.warn("Open region's last host " + lastHost + + " should be the same as the current one " + serverName + + ", ignored the last and used the current one"); + lastHost = serverName; + } + lastAssignments.put(encodedName, lastHost); + regionAssignments.put(hri, lastHost); + } else if (!regionState.isUnassignable()) { + regionsInTransition.put(encodedName, regionState); + } + if (lastHost != null && newState != State.SPLIT) { + synchronized (getLock(lastHost)) { + addToServerHoldings(lastHost, hri); + if (newState != State.OPEN) { + oldAssignments.put(encodedName, lastHost); + } + } } } + return regionState; } - return regionState; } private RegionState putRegionState(RegionState regionState) { HRegionInfo hri = regionState.getRegion(); String encodedName = hri.getEncodedName(); - TableName table = hri.getTable(); RegionState oldState = regionStates.put(encodedName, regionState); - Map map = regionStatesTableIndex.get(table); - if (map == null) { - map = new HashMap(); - regionStatesTableIndex.put(table, map); - } - map.put(encodedName, regionState); + tableSortedRegionStates.put(regionState.getRegion().getRegionName(), regionState); return oldState; } @@ -391,9 +405,9 @@ public class RegionStates { */ public RegionState updateRegionState( final HRegionInfo hri, final State state) { - RegionState regionState = getRegionState(hri.getEncodedName()); + RegionState regionState = regionStates.get(hri.getEncodedName()); return updateRegionState(hri, state, - regionState == null ? null : regionState.getServerName()); + regionState == null ? null : regionState.getServerName()); } /** @@ -421,7 +435,7 @@ public class RegionStates { /** * Transition a region state to OPEN from OPENING/PENDING_OPEN */ - public synchronized RegionState transitionOpenFromPendingOpenOrOpeningOnServer( + public RegionState transitionOpenFromPendingOpenOrOpeningOnServer( final RegionTransition transition, final RegionState fromState, final ServerName sn) { if(fromState.isPendingOpenOrOpeningOnServer(sn)){ return updateRegionState(transition, State.OPEN); @@ -458,14 +472,16 @@ public class RegionStates { } updateRegionState(hri, State.OPEN, serverName, openSeqNum); - synchronized (this) { + synchronized (getLock(hri)) { regionsInTransition.remove(encodedName); ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { if (LOG.isDebugEnabled()) { LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); } - addToServerHoldings(serverName, hri); + synchronized (getLock(serverName)) { + addToServerHoldings(serverName, hri); + } addToReplicaMapping(hri); if (oldServerName == null) { oldServerName = oldAssignments.remove(encodedName); @@ -473,8 +489,10 @@ public class RegionStates { if (oldServerName != null && !oldServerName.equals(serverName) && serverHoldings.containsKey(oldServerName)) { - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - removeFromServerHoldings(oldServerName, hri); + synchronized (getLock(oldServerName)) { + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + removeFromServerHoldings(oldServerName, hri); + } } } } @@ -483,7 +501,7 @@ public class RegionStates { private void addToServerHoldings(ServerName serverName, HRegionInfo hri) { Set regions = serverHoldings.get(serverName); if (regions == null) { - regions = new HashSet(); + regions = Sets.newSetFromMap(new ConcurrentHashMap()); serverHoldings.put(serverName, regions); } regions.add(hri); @@ -491,30 +509,36 @@ public class RegionStates { private void addToReplicaMapping(HRegionInfo hri) { HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); - Set replicas = - defaultReplicaToOtherReplicas.get(defaultReplica); - if (replicas == null) { - replicas = new HashSet(); - defaultReplicaToOtherReplicas.put(defaultReplica, replicas); + synchronized (getLock(defaultReplica)) { + Set replicas = + defaultReplicaToOtherReplicas.get(defaultReplica); + if (replicas == null) { + replicas = Sets.newSetFromMap(new ConcurrentHashMap()); + defaultReplicaToOtherReplicas.put(defaultReplica, replicas); + } + replicas.add(hri); } - replicas.add(hri); } private void removeFromServerHoldings(ServerName serverName, HRegionInfo hri) { Set oldRegions = serverHoldings.get(serverName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(serverName); + if (oldRegions != null) { + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(serverName); + } } } private void removeFromReplicaMapping(HRegionInfo hri) { HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); - Set replicas = defaultReplicaToOtherReplicas.get(defaultReplica); - if (replicas != null) { - replicas.remove(hri); - if (replicas.isEmpty()) { - defaultReplicaToOtherReplicas.remove(defaultReplica); + synchronized (getLock(defaultReplica)) { + Set replicas = defaultReplicaToOtherReplicas.get(defaultReplica); + if (replicas != null) { + replicas.remove(hri); + if (replicas.isEmpty()) { + defaultReplicaToOtherReplicas.remove(defaultReplica); + } } } } @@ -523,12 +547,14 @@ public class RegionStates { * A dead server's wals have been split so that all the regions * used to be open on it can be safely assigned now. Mark them assignable. */ - public synchronized void logSplit(final ServerName serverName) { + public void logSplit(final ServerName serverName) { for (Iterator> it = lastAssignments.entrySet().iterator(); it.hasNext();) { Map.Entry e = it.next(); if (e.getValue().equals(serverName)) { - it.remove(); + synchronized(getRegionLock(e.getKey())) { + it.remove(); + } } } long now = System.currentTimeMillis(); @@ -562,8 +588,10 @@ public class RegionStates { clearLastAssignment(region); } - public synchronized void clearLastAssignment(final HRegionInfo region) { - lastAssignments.remove(region.getEncodedName()); + public void clearLastAssignment(final HRegionInfo region) { + synchronized (getLock(region)) { + lastAssignments.remove(region.getEncodedName()); + } } /** @@ -592,7 +620,7 @@ public class RegionStates { expectedState == null ? State.OFFLINE : expectedState; updateRegionState(hri, newState); String encodedName = hri.getEncodedName(); - synchronized (this) { + synchronized (getLock(hri)) { regionsInTransition.remove(encodedName); ServerName oldServerName = regionAssignments.remove(hri); if (oldServerName != null && serverHoldings.containsKey(oldServerName)) { @@ -601,8 +629,10 @@ public class RegionStates { ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { // Offline the region only if it's merged/split, or the table is disabled/disabling. // Otherwise, offline it from this server only when it is online on a different server. - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - removeFromServerHoldings(oldServerName, hri); + synchronized (getLock(oldServerName)) { + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + removeFromServerHoldings(oldServerName, hri); + } removeFromReplicaMapping(hri); } else { // Need to remember it so that we can offline it from this @@ -624,14 +654,14 @@ public class RegionStates { // ConcurrentModificationException and deadlock in case of meta anassigned, // but RegionState a blocked. Set regionsToOffline = new HashSet(); - synchronized (this) { - Set assignedRegions = serverHoldings.get(sn); - if (assignedRegions == null) { - assignedRegions = new HashSet(); - } + Set assignedRegions = serverHoldings.get(sn); + if (assignedRegions == null) { + assignedRegions = Sets.newSetFromMap(new ConcurrentHashMap()); + } - for (HRegionInfo region : assignedRegions) { - // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE + for (HRegionInfo region : assignedRegions) { + // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE + synchronized (region.getEncodedName()) { if (isRegionOnline(region)) { regionsToOffline.add(region); } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) { @@ -645,40 +675,43 @@ public class RegionStates { } } } + } - for (RegionState state : regionsInTransition.values()) { - HRegionInfo hri = state.getRegion(); - if (assignedRegions.contains(hri)) { - // Region is open on this region server, but in transition. - // This region must be moving away from this server, or splitting/merging. - // SSH will handle it, either skip assigning, or re-assign. - LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn); - } else if (sn.equals(state.getServerName())) { - // Region is in transition on this region server, and this - // region is not open on this server. So the region must be - // moving to this server from another one (i.e. opening or - // pending open on this server, was open on another one. - // Offline state is also kind of pending open if the region is in - // transition. The region could be in failed_close state too if we have - // tried several times to open it while this region server is not reachable) - if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) { - LOG.info("Found region in " + state + - " to be reassigned by ServerCrashProcedure for " + sn); - rits.add(hri); - } else if(state.isSplittingNew()) { - regionsToCleanIfNoMetaEntry.add(state.getRegion()); - } else { - LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state); - } + for (RegionState state : regionsInTransition.values()) { + HRegionInfo hri = state.getRegion(); + if (assignedRegions.contains(hri)) { + // Region is open on this region server, but in transition. + // This region must be moving away from this server, or splitting/merging. + // SSH will handle it, either skip assigning, or re-assign. + LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn); + } else if (sn.equals(state.getServerName())) { + // Region is in transition on this region server, and this + // region is not open on this server. So the region must be + // moving to this server from another one (i.e. opening or + // pending open on this server, was open on another one. + // Offline state is also kind of pending open if the region is in + // transition. The region could be in failed_close state too if we have + // tried several times to open it while this region server is not reachable) + if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) { + LOG.info("Found region in " + state + + " to be reassigned by ServerCrashProcedure for " + sn); + rits.add(hri); + } else if(state.isSplittingNew()) { + regionsToCleanIfNoMetaEntry.add(state.getRegion()); + } else { + LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state); } } - this.notifyAll(); } for (HRegionInfo hri : regionsToOffline) { regionOffline(hri); } + synchronized (this) { + this.notifyAll(); + } + cleanIfNoMetaEntry(regionsToCleanIfNoMetaEntry); return rits; } @@ -715,7 +748,7 @@ public class RegionStates { * @param tableName * @return Online regions from tableName */ - public synchronized List getRegionsOfTable(TableName tableName) { + public List getRegionsOfTable(TableName tableName) { List tableRegions = new ArrayList(); // boundary needs to have table's name but regionID 0 so that it is sorted // before all table's regions. @@ -736,18 +769,23 @@ public class RegionStates { * @param tableName * @return Online regions from tableName */ - public synchronized Map> - getRegionByStateOfTable(TableName tableName) { + public Map> getRegionByStateOfTable(TableName tableName) { Map> tableRegions = new HashMap>(); for (State state : State.values()) { tableRegions.put(state, new ArrayList()); } - Map indexMap = regionStatesTableIndex.get(tableName); - if (indexMap == null) - return tableRegions; - for (RegionState regionState : indexMap.values()) { - tableRegions.get(regionState.getState()).add(regionState.getRegion()); + byte[] startKey = new byte[tableName.getName().length+1]; + System.arraycopy(tableName.getName(), 0, startKey, 0, tableName.getName().length); + startKey[startKey.length-1] = (byte)HConstants.DELIMITER; + + byte[] endKey = new byte[startKey.length+1]; + System.arraycopy(startKey, 0 , endKey, 0, startKey.length); + endKey[endKey.length-1] = (byte)0xFF; + + for (RegionState state : tableSortedRegionStates.subMap( + startKey, endKey).values()) { + tableRegions.get(state.getState()).add(state.getRegion()); } return tableRegions; } @@ -758,7 +796,7 @@ public class RegionStates { * If the region isn't in transition, returns immediately. Otherwise, method * blocks until the region is out of transition. */ - public synchronized void waitOnRegionToClearRegionsInTransition( + public void waitOnRegionToClearRegionsInTransition( final HRegionInfo hri) throws InterruptedException { if (!isRegionInTransition(hri)) return; @@ -779,24 +817,18 @@ public class RegionStates { * We loop through all regions assuming we don't delete tables too much. */ public void tableDeleted(final TableName tableName) { - Set regionsToDelete = new HashSet(); - synchronized (this) { - for (RegionState state: regionStates.values()) { - HRegionInfo region = state.getRegion(); - if (region.getTable().equals(tableName)) { - regionsToDelete.add(region); - } + for (RegionState state: regionStates.values()) { + HRegionInfo region = state.getRegion(); + if (region.getTable().equals(tableName)) { + deleteRegion(region); } } - for (HRegionInfo region: regionsToDelete) { - deleteRegion(region); - } } /** * Get a copy of all regions assigned to a server */ - public synchronized Set getServerRegions(ServerName serverName) { + public Set getServerRegions(ServerName serverName) { Set regions = serverHoldings.get(serverName); if (regions == null) return null; return new HashSet(regions); @@ -806,20 +838,20 @@ public class RegionStates { * Remove a region from all state maps. */ @VisibleForTesting - public synchronized void deleteRegion(final HRegionInfo hri) { + public void deleteRegion(final HRegionInfo hri) { String encodedName = hri.getEncodedName(); - regionsInTransition.remove(encodedName); - regionStates.remove(encodedName); - TableName table = hri.getTable(); - Map indexMap = regionStatesTableIndex.get(table); - indexMap.remove(encodedName); - if (indexMap.size() == 0) - regionStatesTableIndex.remove(table); - lastAssignments.remove(encodedName); - ServerName sn = regionAssignments.remove(hri); - if (sn != null) { - Set regions = serverHoldings.get(sn); - regions.remove(hri); + synchronized (getLock(hri)) { + regionsInTransition.remove(encodedName); + regionStates.remove(encodedName); + tableSortedRegionStates.remove(hri.getRegionName()); + lastAssignments.remove(encodedName); + ServerName sn = regionAssignments.remove(hri); + if (sn != null) { + synchronized (getLock(sn)) { + Set regions = serverHoldings.get(sn); + regions.remove(hri); + } + } } } @@ -833,12 +865,12 @@ public class RegionStates { * think it's online falsely. Therefore if a server is online, we still * need to confirm it reachable and having the expected start code. */ - synchronized boolean wasRegionOnDeadServer(final String encodedName) { + boolean wasRegionOnDeadServer(final String encodedName) { ServerName server = lastAssignments.get(encodedName); return isServerDeadAndNotProcessed(server); } - synchronized boolean isServerDeadAndNotProcessed(ServerName server) { + boolean isServerDeadAndNotProcessed(ServerName server) { if (server == null) return false; if (serverManager.isServerOnline(server)) { String hostAndPort = server.getHostAndPort(); @@ -868,18 +900,18 @@ public class RegionStates { * Get the last region server a region was on for purpose of re-assignment, * i.e. should the re-assignment be held back till log split is done? */ - synchronized ServerName getLastRegionServerOfRegion(final String encodedName) { + ServerName getLastRegionServerOfRegion(final String encodedName) { return lastAssignments.get(encodedName); } - synchronized void setLastRegionServerOfRegions( + void setLastRegionServerOfRegions( final ServerName serverName, final List regionInfos) { for (HRegionInfo hri: regionInfos) { setLastRegionServerOfRegion(serverName, hri.getEncodedName()); } } - synchronized void setLastRegionServerOfRegion( + void setLastRegionServerOfRegion( final ServerName serverName, final String encodedName) { lastAssignments.put(encodedName, serverName); } @@ -888,32 +920,44 @@ public class RegionStates { HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { regionStateStore.splitRegion(p, a, b, sn, getRegionReplication(p)); - synchronized (this) { - // After PONR, split is considered to be done. - // Update server holdings to be aligned with the meta. - Set regions = serverHoldings.get(sn); - if (regions == null) { - throw new IllegalStateException(sn + " should host some regions"); + synchronized (getLock(p)) { + synchronized (getLock(a)) { + synchronized (getLock(b)) { + synchronized (getLock(sn)) { + // After PONR, split is considered to be done. + // Update server holdings to be aligned with the meta. + Set regions = serverHoldings.get(sn); + if (regions == null) { + throw new IllegalStateException(sn + " should host some regions"); + } + regions.remove(p); + regions.add(a); + regions.add(b); + } + } } - regions.remove(p); - regions.add(a); - regions.add(b); } } void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { regionStateStore.mergeRegions(p, a, b, sn, getRegionReplication(a)); - synchronized (this) { - // After PONR, merge is considered to be done. - // Update server holdings to be aligned with the meta. - Set regions = serverHoldings.get(sn); - if (regions == null) { - throw new IllegalStateException(sn + " should host some regions"); + synchronized (getLock(p)) { + synchronized (getLock(a)) { + synchronized (getLock(b)) { + synchronized (getLock(sn)) { + // After PONR, merge is considered to be done. + // Update server holdings to be aligned with the meta. + Set regions = serverHoldings.get(sn); + if (regions == null) { + throw new IllegalStateException(sn + " should host some regions"); + } + regions.remove(a); + regions.remove(b); + regions.add(p); + } + } } - regions.remove(a); - regions.remove(b); - regions.add(p); } } @@ -932,7 +976,7 @@ public class RegionStates { * that are excluded, such as disabled/disabling/enabling tables. All user regions * and their previous locations are returned. */ - synchronized Map closeAllUserRegions(Set excludedTables) { + Map closeAllUserRegions(Set excludedTables) { boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty(); Set toBeClosed = new HashSet(regionStates.size()); for(RegionState state: regionStates.values()) { @@ -961,7 +1005,7 @@ public class RegionStates { * regions being served, ignoring stats about number of requests. * @return the average load */ - protected synchronized double getAverageLoad() { + protected double getAverageLoad() { int numServers = 0, totalLoad = 0; for (Map.Entry> e: serverHoldings.entrySet()) { Set regions = e.getValue(); @@ -998,32 +1042,30 @@ public class RegionStates { getAssignmentsByTable() { Map>> result = new HashMap>>(); - synchronized (this) { - if (!server.getConfiguration().getBoolean( - HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) { - Map> svrToRegions = - new HashMap>(serverHoldings.size()); - for (Map.Entry> e: serverHoldings.entrySet()) { - svrToRegions.put(e.getKey(), new ArrayList(e.getValue())); - } - result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions); - } else { - for (Map.Entry> e: serverHoldings.entrySet()) { - for (HRegionInfo hri: e.getValue()) { - if (hri.isMetaRegion()) continue; - TableName tablename = hri.getTable(); - Map> svrToRegions = result.get(tablename); - if (svrToRegions == null) { - svrToRegions = new HashMap>(serverHoldings.size()); - result.put(tablename, svrToRegions); - } - List regions = svrToRegions.get(e.getKey()); - if (regions == null) { - regions = new ArrayList(); - svrToRegions.put(e.getKey(), regions); - } - regions.add(hri); + if (!server.getConfiguration().getBoolean( + HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) { + Map> svrToRegions = + new HashMap>(serverHoldings.size()); + for (Map.Entry> e: serverHoldings.entrySet()) { + svrToRegions.put(e.getKey(), new ArrayList(e.getValue())); + } + result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions); + } else { + for (Map.Entry> e: serverHoldings.entrySet()) { + for (HRegionInfo hri: e.getValue()) { + if (hri.isMetaRegion()) continue; + TableName tablename = hri.getTable(); + Map> svrToRegions = result.get(tablename); + if (svrToRegions == null) { + svrToRegions = new HashMap>(serverHoldings.size()); + result.put(tablename, svrToRegions); + } + List regions = svrToRegions.get(e.getKey()); + if (regions == null) { + regions = new ArrayList(); + svrToRegions.put(e.getKey(), regions); } + regions.add(hri); } } } @@ -1051,7 +1093,7 @@ public class RegionStates { * Returns a clone of region assignments per server * @return a Map of ServerName to a List of HRegionInfo's */ - protected synchronized Map> getRegionAssignmentsByServer() { + protected Map> getRegionAssignmentsByServer() { Map> regionsByServer = new HashMap>(serverHoldings.size()); for (Map.Entry> e: serverHoldings.entrySet()) { @@ -1060,7 +1102,7 @@ public class RegionStates { return regionsByServer; } - protected synchronized RegionState getRegionState(final String encodedName) { + protected RegionState getRegionState(final String encodedName) { return regionStates.get(encodedName); } @@ -1120,7 +1162,7 @@ public class RegionStates { regionStateStore.updateRegionState(openSeqNum, regionState, oldState); } - synchronized (this) { + synchronized (getLock(hri)) { regionsInTransition.put(encodedName, regionState); putRegionState(regionState); @@ -1149,10 +1191,24 @@ public class RegionStates { } } } + } + synchronized (this) { // notify the change this.notifyAll(); } return regionState; } + + Object getRegionLock(String encodedName) { + return lockCache.getLock("r"+encodedName); + } + + Object getLock(HRegionInfo hri) { + return getRegionLock(hri.getEncodedName()); + } + + Object getLock(ServerName serverName) { + return lockCache.getLock("s"+serverName.getServerName()); + } }