diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockFreeCache.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockFreeCache.java new file mode 100644 index 0000000..b0618c3 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockFreeCache.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.ConcurrentHashMap; + +public class LockFreeCache { + + private final ConcurrentHashMap lockFreeCache = new ConcurrentHashMap(); + + public Object getData(K resource) { + Object lockObject = lockFreeCache.get(resource); + if (lockObject == null) { + lockObject = new Object(); + final Object previousObject = lockFreeCache.putIfAbsent(resource, lockObject); + // if previousObject is not null, some other thread intervened. + lockObject = previousObject == null ? lockObject : previousObject; + } + return lockObject; + } + +} \ No newline at end of file diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockFreeCache.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockFreeCache.java new file mode 100644 index 0000000..0439ddc --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockFreeCache.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertTrue; +import org.junit.Assert; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestLockFreeCache { + @Test + public void testLocker() { + LockFreeCache lockCache = new LockFreeCache(); + Object obj1 = lockCache.getData("L1"); + Object obj2 = lockCache.getData("L2"); + assertTrue(obj1 != obj2); + Object obj3 = lockCache.getData("L1"); + assertTrue(obj1 == obj3); + } + + @Test + public void testSameLock() throws Exception { + StringBuffer sb = new StringBuffer(""); + LockFreeCache lockCache = new LockFreeCache(); + Locker l1 = new Locker("a", 1, lockCache, "L1", 100, sb); + Locker l2 = new Locker("a", 2, lockCache, "L1", 100, sb); + new Thread(l1).start(); + // sleep to make sure l1 gets the lock first + Thread.sleep(500); + // L2 should not get lock till l1 releases + new Thread(l2).start(); + Thread.sleep(500); + Assert.assertEquals("a:1-L a:1-R a:2-L a:2-R", sb.toString().trim()); + } + + @Test + public void testDifferentLocks() throws Exception { + StringBuffer sb = new StringBuffer(""); + LockFreeCache lockCache = new LockFreeCache(); + Locker l1 = new Locker("a", 1, lockCache, "L1", 1000, sb); + Locker l2 = new Locker("a", 2, lockCache, "L2", 1500, sb); + new Thread(l1).start(); + // Sleep for time less than l1's lock timeout + Thread.sleep(500); + // L2 should also get lock + new Thread(l2).start(); + // Wait for this to finish + Thread.sleep(2000); + Assert.assertEquals("a:1-L a:2-L a:1-R a:2-R", sb.toString().trim()); + + } + + class Locker implements Runnable { + protected String name; + private String nameIndex; + private StringBuffer sb; + private LockFreeCache lockCache; + private String resource; + private long timeout; + + public Locker(String name, int nameIndex, LockFreeCache lockCache, String lockResource, + long timeout, StringBuffer buffer) { + this.name = name; + this.nameIndex = name + ":" + nameIndex; + this.sb = buffer; + this.lockCache = lockCache; + this.resource = lockResource; + this.timeout = timeout; + } + + public void run() { + try { + synchronized (lockCache.getData(resource)) { + sb.append(nameIndex + "-L "); + Thread.sleep(timeout); + sb.append(nameIndex + "-R "); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index b657ed4..23a0391 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -31,6 +31,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; @@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.LockFreeCache; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; @@ -154,6 +156,8 @@ public class AssignmentManager extends ZooKeeperListener { final private KeyLocker locker = new KeyLocker(); + final private LockFreeCache lockCache = new LockFreeCache(); + /** * Map of regions to reopen after the schema of a table is changed. Key - * encoded region name, value - HRegionInfo @@ -183,7 +187,7 @@ public class AssignmentManager extends ZooKeeperListener { // shutdown processing -- St.Ack // All access to this Map must be synchronized. final NavigableMap regionPlans = - new TreeMap(); + new ConcurrentSkipListMap(); private final ZKTable zkTable; @@ -394,7 +398,7 @@ public class AssignmentManager extends ZooKeeperListener { * @param plan */ public void addPlan(String encodedName, RegionPlan plan) { - synchronized (regionPlans) { + synchronized (lockCache.getData(encodedName)) { regionPlans.put(encodedName, plan); } } @@ -403,9 +407,7 @@ public class AssignmentManager extends ZooKeeperListener { * Add a map of region plans. */ public void addPlans(Map plans) { - synchronized (regionPlans) { - regionPlans.putAll(plans); - } + regionPlans.putAll(plans); } /** @@ -1483,16 +1485,7 @@ public class AssignmentManager extends ZooKeeperListener { Preconditions.checkState(tomActivated); if (sn == null) return; - // This loop could be expensive. - // First make a copy of current regionPlan rather than hold sync while - // looping because holding sync can cause deadlock. Its ok in this loop - // if the Map we're going against is a little stale - List> rps; - synchronized(this.regionPlans) { - rps = new ArrayList>(regionPlans.entrySet()); - } - - for (Map.Entry e : rps) { + for (Map.Entry e : regionPlans.entrySet()) { if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) { RegionState regionState = regionStates.getRegionTransitionState(e.getKey()); if (regionState != null) { @@ -2335,7 +2328,7 @@ public class AssignmentManager extends ZooKeeperListener { boolean newPlan = false; RegionPlan existingPlan; - synchronized (this.regionPlans) { + synchronized (lockCache.getData(encodedName)) { existingPlan = this.regionPlans.get(encodedName); if (existingPlan != null && existingPlan.getDestination() != null) { @@ -3099,8 +3092,9 @@ public class AssignmentManager extends ZooKeeperListener { * @param region Region whose plan we are to clear. */ void clearRegionPlan(final HRegionInfo region) { - synchronized (this.regionPlans) { - this.regionPlans.remove(region.getEncodedName()); + String encodedName = region.getEncodedName(); + synchronized (lockCache.getData(encodedName)) { + this.regionPlans.remove(encodedName); } } @@ -3389,7 +3383,6 @@ public class AssignmentManager extends ZooKeeperListener { */ public List processServerShutdown(final ServerName sn) { // Clean out any existing assignment plans for this server - synchronized (this.regionPlans) { for (Iterator > i = this.regionPlans.entrySet().iterator(); i.hasNext();) { Map.Entry e = i.next(); @@ -3400,7 +3393,6 @@ public class AssignmentManager extends ZooKeeperListener { i.remove(); } } - } List regions = regionStates.serverOffline(watcher, sn); for (Iterator it = regions.iterator(); it.hasNext(); ) { HRegionInfo hri = it.next(); @@ -3462,8 +3454,9 @@ public class AssignmentManager extends ZooKeeperListener { + (state == null ? "not in region states" : state)); return; } - synchronized (this.regionPlans) { - this.regionPlans.put(plan.getRegionName(), plan); + String regionName = plan.getRegionName(); + synchronized (lockCache.getData(regionName)) { + this.regionPlans.put(regionName, plan); } unassign(hri, false, plan.getDestination()); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 749784e..a4c84a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -26,7 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,8 +43,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LockFreeCache; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -62,25 +65,25 @@ public class RegionStates { /** * Regions currently in transition. */ - final HashMap regionsInTransition; + final ConcurrentHashMap regionsInTransition = new ConcurrentHashMap(); /** * Region encoded name to state map. * All the regions should be in this map. */ - private final HashMap regionStates; + private final ConcurrentHashMap regionStates = new ConcurrentHashMap(); /** * Server to regions assignment map. * Contains the set of regions currently assigned to a given server. */ - private final Map> serverHoldings; + private final ConcurrentHashMap> serverHoldings = new ConcurrentHashMap>(); /** * Region to server assignment map. * Contains the server a given region is currently assigned to. */ - private final TreeMap regionAssignments; + private final ConcurrentSkipListMap regionAssignments = new ConcurrentSkipListMap(); /** * Encoded region name to server assignment map for re-assignment @@ -92,14 +95,14 @@ public class RegionStates { * is offline while the info in lastAssignments is cleared when * the region is closed or the server is dead and processed. */ - private final HashMap lastAssignments; + private final ConcurrentHashMap lastAssignments = new ConcurrentHashMap(); /** * Map a host port pair string to the latest start code * of a region server which is known to be dead. It is dead * to us, but server manager may not know it yet. */ - private final HashMap deadServers; + private final ConcurrentHashMap deadServers = new ConcurrentHashMap(); /** * Map a dead servers to the time when log split is done. @@ -108,8 +111,10 @@ public class RegionStates { * on a configured time. By default, we assume a dead * server should be done with log splitting in two hours. */ - private final HashMap processedServers; - private long lastProcessedServerCleanTime; + private final ConcurrentHashMap processedServers = new ConcurrentHashMap(); + + private final LockFreeCache lockCache = new LockFreeCache(); + private volatile long lastProcessedServerCleanTime; private final RegionStateStore regionStateStore; private final ServerManager serverManager; @@ -121,13 +126,6 @@ public class RegionStates { RegionStates(final Server master, final ServerManager serverManager, final RegionStateStore regionStateStore) { - regionStates = new HashMap(); - regionsInTransition = new HashMap(); - serverHoldings = new HashMap>(); - regionAssignments = new TreeMap(); - lastAssignments = new HashMap(); - processedServers = new HashMap(); - deadServers = new HashMap(); this.regionStateStore = regionStateStore; this.serverManager = serverManager; this.server = master; @@ -136,57 +134,60 @@ public class RegionStates { /** * @return an unmodifiable the region assignment map */ - public synchronized Map getRegionAssignments() { + public Map getRegionAssignments() { return Collections.unmodifiableMap(regionAssignments); } - public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) { + public ServerName getRegionServerOfRegion(HRegionInfo hri) { return regionAssignments.get(hri); } /** * Get regions in transition and their states */ - @SuppressWarnings("unchecked") - public synchronized Map getRegionsInTransition() { - return (Map)regionsInTransition.clone(); + public Map getRegionsInTransition() { + return Collections.unmodifiableMap(regionsInTransition); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final HRegionInfo hri) { + public boolean isRegionInTransition(final HRegionInfo hri) { return regionsInTransition.containsKey(hri.getEncodedName()); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final String encodedName) { + public boolean isRegionInTransition(final String encodedName) { return regionsInTransition.containsKey(encodedName); } /** * @return True if any region in transition. */ - public synchronized boolean isRegionsInTransition() { + public boolean isRegionsInTransition() { return !regionsInTransition.isEmpty(); } /** * @return True if specified region assigned, and not in transition. */ - public synchronized boolean isRegionOnline(final HRegionInfo hri) { - return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + public boolean isRegionOnline(final HRegionInfo hri) { + synchronized (lockCache.getData(hri.getEncodedName())) { + return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + } } /** * @return True if specified region offline/closed, but not in transition. * If the region is not in the map, it is offline to us too. */ - public synchronized boolean isRegionOffline(final HRegionInfo hri) { - return getRegionState(hri) == null || (!isRegionInTransition(hri) - && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + public boolean isRegionOffline(final HRegionInfo hri) { + synchronized (lockCache.getData(hri.getEncodedName())) { + return getRegionState(hri) == null + || (!isRegionInTransition(hri) && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + } } /** @@ -224,7 +225,7 @@ public class RegionStates { /** * Get region transition state */ - public synchronized RegionState + public RegionState getRegionTransitionState(final String encodedName) { return regionsInTransition.get(encodedName); } @@ -256,37 +257,42 @@ public class RegionStates { * If the region is already in RegionStates, this call has * no effect, and the original state is returned. */ - public synchronized RegionState createRegionState( + public RegionState + createRegionState( final HRegionInfo hri, State newState, ServerName serverName) { - if (newState == null || (newState == State.OPEN && serverName == null)) { - newState = State.OFFLINE; - } - if (hri.isOffline() && hri.isSplit()) { - newState = State.SPLIT; - serverName = null; - } - String encodedName = hri.getEncodedName(); - RegionState regionState = regionStates.get(encodedName); - if (regionState != null) { - LOG.warn("Tried to create a state for a region already in RegionStates, " - + "used existing: " + regionState + ", ignored new: " + newState); - } else { - regionState = new RegionState(hri, newState, serverName); - regionStates.put(encodedName, regionState); - if (newState == State.OPEN) { - regionAssignments.put(hri, serverName); - lastAssignments.put(encodedName, serverName); - Set regions = serverHoldings.get(serverName); - if (regions == null) { - regions = new HashSet(); - serverHoldings.put(serverName, regions); + synchronized (lockCache.getData(hri.getEncodedName())) { + if (newState == null || (newState == State.OPEN && serverName == null)) { + newState = State.OFFLINE; + } + if (hri.isOffline() && hri.isSplit()) { + newState = State.SPLIT; + serverName = null; + } + String encodedName = hri.getEncodedName(); + RegionState regionState = regionStates.get(encodedName); + if (regionState != null) { + LOG.warn("Tried to create a state for a region already in RegionStates, " + + "used existing: " + regionState + ", ignored new: " + newState); + } else { + regionState = new RegionState(hri, newState, serverName); + regionStates.put(encodedName, regionState); + if (newState == State.OPEN) { + regionAssignments.put(hri, serverName); + lastAssignments.put(encodedName, serverName); + Set regions = serverHoldings.get(serverName); + if (regions == null) { + regions = Collections.newSetFromMap(new ConcurrentHashMap()); + final Set previousRegions = serverHoldings.putIfAbsent(serverName, regions); + // Check if some one else intervened + regions = previousRegions != null? previousRegions: regions; + } + regions.add(hri); + } else if (!regionState.isUnassignable()) { + regionsInTransition.put(encodedName, regionState); } - regions.add(hri); - } else if (!regionState.isUnassignable()) { - regionsInTransition.put(encodedName, regionState); } + return regionState; } - return regionState; } /** @@ -350,36 +356,40 @@ public class RegionStates { + " was opened on a dead server: " + serverName); return; } - updateRegionState(hri, State.OPEN, serverName, openSeqNum); - - synchronized (this) { + synchronized (lockCache.getData(hri.getEncodedName())) { + updateRegionState(hri, State.OPEN, serverName, openSeqNum); regionsInTransition.remove(hri.getEncodedName()); ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName); Set regions = serverHoldings.get(serverName); if (regions == null) { - regions = new HashSet(); - serverHoldings.put(serverName, regions); + regions = Collections.newSetFromMap(new ConcurrentHashMap()); + final Set previousRegions = serverHoldings.putIfAbsent(serverName, regions); + // Check if some other thread has intervened + regions = previousRegions != null? previousRegions: regions; } regions.add(hri); if (oldServerName != null) { - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); Set oldRegions = serverHoldings.get(oldServerName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(oldServerName); + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + if (oldRegions != null) { + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(oldServerName, oldRegions); + } } } } } + } /** * A dead server's hlogs have been split so that all the regions * used to be open on it can be safely assigned now. Mark them assignable. */ - public synchronized void logSplit(final ServerName serverName) { + public void logSplit(final ServerName serverName) { for (Iterator> it = lastAssignments.entrySet().iterator(); it.hasNext();) { Map.Entry e = it.next(); @@ -391,6 +401,8 @@ public class RegionStates { if (LOG.isDebugEnabled()) { LOG.debug("Adding to processed servers " + serverName); } + // This method is called once per server, it should be ok not to use + // putIfAbsent as entries from the map are lazily removed processedServers.put(serverName, Long.valueOf(now)); Configuration conf = server.getConfiguration(); long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME); @@ -418,8 +430,10 @@ public class RegionStates { clearLastAssignment(region); } - public synchronized void clearLastAssignment(final HRegionInfo region) { - lastAssignments.remove(region.getEncodedName()); + public void clearLastAssignment(final HRegionInfo region) { + synchronized (lockCache.getData(region.getEncodedName())) { + lastAssignments.remove(region.getEncodedName()); + } } /** @@ -446,17 +460,22 @@ public class RegionStates { } State newState = expectedState == null ? State.OFFLINE : expectedState; - updateRegionState(hri, newState); - - synchronized (this) { + String encodedName = hri.getEncodedName(); + synchronized (lockCache.getData(encodedName)) { + updateRegionState(hri, newState); regionsInTransition.remove(hri.getEncodedName()); ServerName oldServerName = regionAssignments.remove(hri); - if (oldServerName != null && serverHoldings.containsKey(oldServerName)) { + if (oldServerName != null + && serverHoldings.containsKey(oldServerName)) { + // Offline the region only if it's merged/split, or the table is disabled/disabling. + // Otherwise, offline it from this server only when it is online on a different server. LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); Set oldRegions = serverHoldings.get(oldServerName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(oldServerName); + if (oldRegions != null) { + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(oldServerName, oldRegions); + } } } } @@ -465,13 +484,13 @@ public class RegionStates { /** * A server is offline, all regions on it are dead. */ - public synchronized List serverOffline( + public List serverOffline( final ZooKeeperWatcher watcher, final ServerName sn) { // Offline all regions on this server not already in transition. List rits = new ArrayList(); Set assignedRegions = serverHoldings.get(sn); if (assignedRegions == null) { - assignedRegions = new HashSet(); + assignedRegions = Collections.newSetFromMap(new ConcurrentHashMap()); } // Offline regions outside the loop to avoid ConcurrentModificationException @@ -480,16 +499,14 @@ public class RegionStates { // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE if (isRegionOnline(region)) { regionsToOffline.add(region); - } else { - if (isRegionInState(region, State.SPLITTING, State.MERGING)) { - LOG.debug("Offline splitting/merging region " + getRegionState(region)); - try { - // Delete the ZNode if exists - ZKAssign.deleteNodeFailSilent(watcher, region); - regionsToOffline.add(region); - } catch (KeeperException ke) { - server.abort("Unexpected ZK exception deleting node " + region, ke); - } + } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) { + LOG.debug("Offline splitting/merging region " + getRegionState(region)); + try { + // Delete the ZNode if exists + ZKAssign.deleteNodeFailSilent(watcher, region); + regionsToOffline.add(region); + } catch (KeeperException ke) { + server.abort("Unexpected ZK exception deleting node " + region, ke); } } } @@ -522,7 +539,9 @@ public class RegionStates { } } - this.notifyAll(); + synchronized (this) { + this.notifyAll(); + } return rits; } @@ -536,7 +555,7 @@ public class RegionStates { * @param tableName * @return Online regions from tableName */ - public synchronized List getRegionsOfTable(TableName tableName) { + public List getRegionsOfTable(TableName tableName) { List tableRegions = new ArrayList(); // boundary needs to have table's name but regionID 0 so that it is sorted // before all table's regions. @@ -555,7 +574,7 @@ public class RegionStates { * If the region isn't in transition, returns immediately. Otherwise, method * blocks until the region is out of transition. */ - public synchronized void waitOnRegionToClearRegionsInTransition( + public void waitOnRegionToClearRegionsInTransition( final HRegionInfo hri) throws InterruptedException { if (!isRegionInTransition(hri)) return; @@ -577,14 +596,12 @@ public class RegionStates { */ public void tableDeleted(final TableName tableName) { Set regionsToDelete = new HashSet(); - synchronized (this) { for (RegionState state: regionStates.values()) { HRegionInfo region = state.getRegion(); if (region.getTable().equals(tableName)) { regionsToDelete.add(region); } } - } for (HRegionInfo region: regionsToDelete) { deleteRegion(region); } @@ -600,12 +617,12 @@ public class RegionStates { * think it's online falsely. Therefore if a server is online, we still * need to confirm it reachable and having the expected start code. */ - synchronized boolean wasRegionOnDeadServer(final String encodedName) { + boolean wasRegionOnDeadServer(final String encodedName) { ServerName server = lastAssignments.get(encodedName); return isServerDeadAndNotProcessed(server); } - synchronized boolean isServerDeadAndNotProcessed(ServerName server) { + boolean isServerDeadAndNotProcessed(ServerName server) { if (server == null) return false; if (serverManager.isServerOnline(server)) { String hostAndPort = server.getHostAndPort(); @@ -616,6 +633,8 @@ public class RegionStates { return false; } // The size of deadServers won't grow unbounded. + // Avoid putIfAbsent as it should be ok for the startCode to be overwritten for + // a given hostAndPort deadServers.put(hostAndPort, Long.valueOf(startCode)); } // Watch out! If the server is not dead, the region could @@ -635,23 +654,26 @@ public class RegionStates { * Get the last region server a region was on for purpose of re-assignment, * i.e. should the re-assignment be held back till log split is done? */ - synchronized ServerName getLastRegionServerOfRegion(final String encodedName) { + ServerName getLastRegionServerOfRegion(final String encodedName) { return lastAssignments.get(encodedName); } - synchronized void setLastRegionServerOfRegions( - final ServerName serverName, final List regionInfos) { - for (HRegionInfo hri: regionInfos) { - setLastRegionServerOfRegion(serverName, hri.getEncodedName()); - } - } + void setLastRegionServerOfRegions(final ServerName serverName, + final List regionInfos) { + for (HRegionInfo hri : regionInfos) { + setLastRegionServerOfRegion(serverName, hri.getEncodedName()); + } + } - synchronized void setLastRegionServerOfRegion( + // Called during startup (HMaster.joinCluster()) + void setLastRegionServerOfRegion( final ServerName serverName, final String encodedName) { - lastAssignments.put(encodedName, serverName); + synchronized (lockCache.getData(encodedName)) { + lastAssignments.put(encodedName, serverName); + } } - synchronized void closeAllUserRegions(Set excludedTables) { + void closeAllUserRegions(Set excludedTables) { Set toBeClosed = new HashSet(regionStates.size()); for(RegionState state: regionStates.values()) { HRegionInfo hri = state.getRegion(); @@ -672,7 +694,7 @@ public class RegionStates { * regions being served, ignoring stats about number of requests. * @return the average load */ - protected synchronized double getAverageLoad() { + protected double getAverageLoad() { int numServers = 0, totalLoad = 0; for (Map.Entry> e: serverHoldings.entrySet()) { Set regions = e.getValue(); @@ -699,7 +721,6 @@ public class RegionStates { getAssignmentsByTable() { Map>> result = new HashMap>>(); - synchronized (this) { if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false)) { Map> svrToRegions = new HashMap>(serverHoldings.size()); @@ -726,7 +747,7 @@ public class RegionStates { } } } - } + Map onlineSvrs = serverManager.getOnlineServers(); @@ -745,7 +766,7 @@ public class RegionStates { return getRegionState(hri.getEncodedName()); } - protected synchronized RegionState getRegionState(final String encodedName) { + protected RegionState getRegionState(final String encodedName) { return regionStates.get(encodedName); } @@ -797,14 +818,13 @@ public class RegionStates { String encodedName = hri.getEncodedName(); RegionState regionState = new RegionState( hri, state, System.currentTimeMillis(), serverName); - RegionState oldState = getRegionState(encodedName); - if (!regionState.equals(oldState)) { - LOG.info("Transition " + oldState + " to " + regionState); - // Persist region state before updating in-memory info, if needed - regionStateStore.updateRegionState(openSeqNum, regionState, oldState); - } - - synchronized (this) { + synchronized (lockCache.getData(encodedName)) { + RegionState oldState = getRegionState(encodedName); + if (!regionState.equals(oldState)) { + LOG.debug("Transition " + oldState + " to " + regionState); + // Persist region state before updating in-memory info, if needed + regionStateStore.updateRegionState(openSeqNum, regionState, oldState); + } regionsInTransition.put(encodedName, regionState); regionStates.put(encodedName, regionState); @@ -813,8 +833,8 @@ public class RegionStates { if ((state == State.CLOSED || state == State.MERGED || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) { ServerName last = lastAssignments.get(encodedName); - if (last.equals(serverName)) { - lastAssignments.remove(encodedName); + if (last != null && last.equals(serverName)) { + lastAssignments.remove(encodedName, last); } else { LOG.warn(encodedName + " moved to " + state + " on " + serverName + ", expected " + last); @@ -833,25 +853,30 @@ public class RegionStates { } } } - - // notify the change + } + synchronized (this) { this.notifyAll(); } + return regionState; } /** * Remove a region from all state maps. */ - private synchronized void deleteRegion(final HRegionInfo hri) { + private void deleteRegion(final HRegionInfo hri) { String encodedName = hri.getEncodedName(); - regionsInTransition.remove(encodedName); - regionStates.remove(encodedName); - lastAssignments.remove(encodedName); - ServerName sn = regionAssignments.remove(hri); - if (sn != null) { - Set regions = serverHoldings.get(sn); - regions.remove(hri); + synchronized (lockCache.getData(encodedName)) { + regionsInTransition.remove(encodedName); + regionStates.remove(encodedName); + lastAssignments.remove(encodedName); + ServerName sn = regionAssignments.remove(hri); + if (sn != null) { + Set regions = serverHoldings.get(sn); + if (regions != null) { + regions.remove(hri); + } + } } } }