diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockCache.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockCache.java new file mode 100644 index 0000000..64ec8d6 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LockCache.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +/** + * Cache providing mapping of keys to values. Entries stay in cache till + * they are evicted. + * @param + */ +@InterfaceAudience.Private +public class LockCache { + // TODO - configure the concurrency of cache if required + LoadingCache cache = CacheBuilder.newBuilder().softValues().build(new CacheLoader() { + @Override + public Object load(K key) { + return new Object(); + } + }); + + /** + * Returns value associated with resource in cache if exists, first loading that value if necessary. + * @param resource the key in cache + * @return + */ + public Object getLock(K resource) { + return cache.getUnchecked(resource); + } + +} \ No newline at end of file diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockCache.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockCache.java new file mode 100644 index 0000000..ab81f59 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestLockCache.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestLockCache { + @Test + public void testLocker() { + LockCache lockCache = new LockCache(); + Object obj1 = lockCache.getLock("L1"); + Object obj2 = lockCache.getLock("L2"); + assertTrue(obj1 != obj2); + Object obj3 = lockCache.getLock("L1"); + assertTrue(obj1 == obj3); + } + + @Test + public void testSameLock() throws Exception { + StringBuffer sb = new StringBuffer(""); + LockCache lockCache = new LockCache(); + Locker l1 = new Locker("a", 1, lockCache, "L1", 1000, sb); + Locker l2 = new Locker("a", 2, lockCache, "L1", 1000, sb); + new Thread(l1).start(); + // sleep to make sure l1 gets the lock first + Thread.sleep(500); + // L2 should not get lock till l1 releases + new Thread(l2).start(); + while(!l1.isDone() || !l2.isDone()) { + Thread.sleep(100); + } + Assert.assertEquals("a:1-B a:1-L a:2-B a:1-R a:2-L a:2-R", sb.toString().trim()); + } + + @Test + public void testDifferentLocks() throws Exception { + StringBuffer sb = new StringBuffer(""); + LockCache lockCache = new LockCache(); + Locker l1 = new Locker("a", 1, lockCache, "L1", 1000, sb); + Locker l2 = new Locker("a", 2, lockCache, "L2", 1500, sb); + new Thread(l1).start(); + // Sleep for time less than l1's lock timeout + Thread.sleep(500); + // L2 should also get lock + new Thread(l2).start(); + // Wait for this to finish + while(!l1.isDone() || !l2.isDone()) { + Thread.sleep(100); + } + Assert.assertEquals("a:1-B a:1-L a:2-B a:2-L a:1-R a:2-R", sb.toString().trim()); + + } + + class Locker implements Runnable { + protected String name; + private String nameIndex; + private StringBuffer sb; + private LockCache lockCache; + private String resource; + private long timeout; + private volatile boolean done; + + public Locker(String name, int nameIndex, LockCache lockCache, String lockResource, + long timeout, StringBuffer buffer) { + this.name = name; + this.nameIndex = name + ":" + nameIndex; + this.sb = buffer; + this.lockCache = lockCache; + this.resource = lockResource; + this.timeout = timeout; + done = false; + } + + public void run() { + try { + sb.append(nameIndex + "-B "); + synchronized (lockCache.getLock(resource)) { + sb.append(nameIndex + "-L "); + Thread.sleep(timeout); + sb.append(nameIndex + "-R "); + } + done = true; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public boolean isDone() { + return done; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index fc80d9c..8e62707 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -31,6 +31,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; @@ -87,6 +88,7 @@ import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.LockCache; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; @@ -152,6 +154,8 @@ public class AssignmentManager extends ZooKeeperListener { final private KeyLocker locker = new KeyLocker(); + final private LockCache lockCache = new LockCache(); + /** * Map of regions to reopen after the schema of a table is changed. Key - * encoded region name, value - HRegionInfo @@ -181,7 +185,7 @@ public class AssignmentManager extends ZooKeeperListener { // shutdown processing -- St.Ack // All access to this Map must be synchronized. final NavigableMap regionPlans = - new TreeMap(); + new ConcurrentSkipListMap(); private final ZKTable zkTable; @@ -392,7 +396,7 @@ public class AssignmentManager extends ZooKeeperListener { * @param plan */ public void addPlan(String encodedName, RegionPlan plan) { - synchronized (regionPlans) { + synchronized (lockCache.getLock(encodedName)) { regionPlans.put(encodedName, plan); } } @@ -401,8 +405,11 @@ public class AssignmentManager extends ZooKeeperListener { * Add a map of region plans. */ public void addPlans(Map plans) { - synchronized (regionPlans) { - regionPlans.putAll(plans); + for (Map.Entry planEntry : plans.entrySet()) { + String encodedRegionName = planEntry.getKey(); + synchronized (lockCache.getLock(encodedRegionName)) { + regionPlans.put(planEntry.getKey(), planEntry.getValue()); + } } } @@ -1310,20 +1317,26 @@ public class AssignmentManager extends ZooKeeperListener { if (serverManager.isServerOnline(serverName)) { if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) { - regionOnline(regionInfo, serverName); - if (disabled) { - // if server is offline, no hurt to unassign again - LOG.info("Opened " + regionNameStr - + "but this table is disabled, triggering close of region"); - unassign(regionInfo); + synchronized (regionStates.getLock(regionInfo.getEncodedName())) { + regionOnline(regionInfo, serverName); + if (disabled) { + // if server is offline, no hurt to unassign again + LOG.info("Opened " + regionNameStr + + "but this table is disabled, triggering close of region"); + unassign(regionInfo); + } } } else if (rs.isMergingNew()) { - synchronized (regionStates) { - String p = regionInfo.getEncodedName(); + String p = regionInfo.getEncodedName(); + synchronized (regionStates.getLock(p)) { PairOfSameType regions = mergingRegions.get(p); - if (regions != null) { - onlineMergingRegion(disabled, regions.getFirst(), serverName); - onlineMergingRegion(disabled, regions.getSecond(), serverName); + synchronized (regionStates.getLock(regions.getFirst().getEncodedName())) { + synchronized (regionStates.getLock(regions.getSecond().getEncodedName())) { + if (regions != null) { + onlineMergingRegion(disabled, regions.getFirst(), serverName); + onlineMergingRegion(disabled, regions.getSecond(), serverName); + } + } } } } @@ -1482,16 +1495,7 @@ public class AssignmentManager extends ZooKeeperListener { Preconditions.checkState(tomActivated); if (sn == null) return; - // This loop could be expensive. - // First make a copy of current regionPlan rather than hold sync while - // looping because holding sync can cause deadlock. Its ok in this loop - // if the Map we're going against is a little stale - List> rps; - synchronized(this.regionPlans) { - rps = new ArrayList>(regionPlans.entrySet()); - } - - for (Map.Entry e : rps) { + for (Map.Entry e : regionPlans.entrySet()) { if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) { RegionState regionState = regionStates.getRegionTransitionState(e.getKey()); if (regionState != null) { @@ -2367,7 +2371,7 @@ public class AssignmentManager extends ZooKeeperListener { boolean newPlan = false; RegionPlan existingPlan; - synchronized (this.regionPlans) { + synchronized (lockCache.getLock(encodedName)) { existingPlan = this.regionPlans.get(encodedName); if (existingPlan != null && existingPlan.getDestination() != null) { @@ -3138,8 +3142,9 @@ public class AssignmentManager extends ZooKeeperListener { * @param region Region whose plan we are to clear. */ void clearRegionPlan(final HRegionInfo region) { - synchronized (this.regionPlans) { - this.regionPlans.remove(region.getEncodedName()); + String encodedName = region.getEncodedName(); + synchronized (lockCache.getLock(encodedName)) { + this.regionPlans.remove(encodedName); } } @@ -3428,7 +3433,6 @@ public class AssignmentManager extends ZooKeeperListener { */ public List processServerShutdown(final ServerName sn) { // Clean out any existing assignment plans for this server - synchronized (this.regionPlans) { for (Iterator > i = this.regionPlans.entrySet().iterator(); i.hasNext();) { Map.Entry e = i.next(); @@ -3439,7 +3443,6 @@ public class AssignmentManager extends ZooKeeperListener { i.remove(); } } - } List regions = regionStates.serverOffline(watcher, sn); for (Iterator it = regions.iterator(); it.hasNext(); ) { HRegionInfo hri = it.next(); @@ -3501,8 +3504,9 @@ public class AssignmentManager extends ZooKeeperListener { + (state == null ? "not in region states" : state)); return; } - synchronized (this.regionPlans) { - this.regionPlans.put(plan.getRegionName(), plan); + String regionName = plan.getRegionName(); + synchronized (lockCache.getLock(regionName)) { + this.regionPlans.put(regionName, plan); } unassign(hri, false, plan.getDestination()); } finally { @@ -3858,19 +3862,23 @@ public class AssignmentManager extends ZooKeeperListener { } } - synchronized (regionStates) { - regionStates.updateRegionState(hri_a, State.MERGING); - regionStates.updateRegionState(hri_b, State.MERGING); - regionStates.updateRegionState(p, State.MERGING_NEW, sn); + synchronized (regionStates.getLock(p.getEncodedName())) { + synchronized (regionStates.getLock(hri_a.getEncodedName())) { + synchronized (regionStates.getLock(hri_b.getEncodedName())) { + regionStates.updateRegionState(hri_a, State.MERGING); + regionStates.updateRegionState(hri_b, State.MERGING); + regionStates.updateRegionState(p, State.MERGING_NEW, sn); - if (et != EventType.RS_ZK_REGION_MERGED) { - this.mergingRegions.put(encodedName, - new PairOfSameType(hri_a, hri_b)); - } else { - this.mergingRegions.remove(encodedName); - regionOffline(hri_a, State.MERGED); - regionOffline(hri_b, State.MERGED); - regionOnline(p, sn); + if (et != EventType.RS_ZK_REGION_MERGED) { + this.mergingRegions.put(encodedName, + new PairOfSameType(hri_a, hri_b)); + } else { + this.mergingRegions.remove(encodedName); + regionOffline(hri_a, State.MERGED); + regionOffline(hri_b, State.MERGED); + regionOnline(p, sn); + } + } } } @@ -3979,22 +3987,26 @@ public class AssignmentManager extends ZooKeeperListener { } } - synchronized (regionStates) { - regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn); - regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn); - regionStates.updateRegionState(rt, State.SPLITTING); + synchronized (regionStates.getLock(rs_p.getRegion().getEncodedName())) { + synchronized (regionStates.getLock(hri_a.getEncodedName())) { + synchronized (regionStates.getLock(hri_b.getEncodedName())) { + regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn); + regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn); + regionStates.updateRegionState(rt, State.SPLITTING); - // The below is for testing ONLY! We can't do fault injection easily, so - // resort to this kinda uglyness -- St.Ack 02/25/2011. - if (TEST_SKIP_SPLIT_HANDLING) { - LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"); - return true; // return true so that the splitting node stays - } + // The below is for testing ONLY! We can't do fault injection easily, so + // resort to this kinda uglyness -- St.Ack 02/25/2011. + if (TEST_SKIP_SPLIT_HANDLING) { + LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"); + return true; // return true so that the splitting node stays + } - if (et == EventType.RS_ZK_REGION_SPLIT) { - regionOffline(p, State.SPLIT); - regionOnline(hri_a, sn); - regionOnline(hri_b, sn); + if (et == EventType.RS_ZK_REGION_SPLIT) { + regionOffline(p, State.SPLIT); + regionOnline(hri_a, sn); + regionOnline(hri_b, sn); + } + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 6a3d32f..2bc98ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -26,7 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,7 +42,9 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LockCache; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -59,28 +62,30 @@ import com.google.common.base.Preconditions; public class RegionStates { private static final Log LOG = LogFactory.getLog(RegionStates.class); + // TODO - Configure the concurrency of concurrent data structures + // (possibly a function of handlers and bulk assigner threads) /** * Regions currently in transition. */ - final HashMap regionsInTransition; + final ConcurrentHashMap regionsInTransition = new ConcurrentHashMap(); /** * Region encoded name to state map. * All the regions should be in this map. */ - private final HashMap regionStates; + private final ConcurrentHashMap regionStates = new ConcurrentHashMap(); /** * Server to regions assignment map. * Contains the set of regions currently assigned to a given server. */ - private final Map> serverHoldings; + private final ConcurrentHashMap> serverHoldings = new ConcurrentHashMap>(); /** * Region to server assignment map. * Contains the server a given region is currently assigned to. */ - private final TreeMap regionAssignments; + private final ConcurrentSkipListMap regionAssignments = new ConcurrentSkipListMap(); /** * Encoded region name to server assignment map for re-assignment @@ -92,14 +97,14 @@ public class RegionStates { * is offline while the info in lastAssignments is cleared when * the region is closed or the server is dead and processed. */ - private final HashMap lastAssignments; + private final ConcurrentHashMap lastAssignments = new ConcurrentHashMap(); /** * Map a host port pair string to the latest start code * of a region server which is known to be dead. It is dead * to us, but server manager may not know it yet. */ - private final HashMap deadServers; + private final ConcurrentHashMap deadServers = new ConcurrentHashMap(); /** * Map a dead servers to the time when log split is done. @@ -108,8 +113,10 @@ public class RegionStates { * on a configured time. By default, we assume a dead * server should be done with log splitting in two hours. */ - private final HashMap processedServers; - private long lastProcessedServerCleanTime; + private final ConcurrentHashMap processedServers = new ConcurrentHashMap(); + + private final LockCache lockCache = new LockCache(); + private volatile long lastProcessedServerCleanTime; private final RegionStateStore regionStateStore; private final ServerManager serverManager; @@ -121,13 +128,6 @@ public class RegionStates { RegionStates(final Server master, final ServerManager serverManager, final RegionStateStore regionStateStore) { - regionStates = new HashMap(); - regionsInTransition = new HashMap(); - serverHoldings = new HashMap>(); - regionAssignments = new TreeMap(); - lastAssignments = new HashMap(); - processedServers = new HashMap(); - deadServers = new HashMap(); this.regionStateStore = regionStateStore; this.serverManager = serverManager; this.server = master; @@ -136,57 +136,60 @@ public class RegionStates { /** * @return an unmodifiable the region assignment map */ - public synchronized Map getRegionAssignments() { + public Map getRegionAssignments() { return Collections.unmodifiableMap(regionAssignments); } - public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) { + public ServerName getRegionServerOfRegion(HRegionInfo hri) { return regionAssignments.get(hri); } /** * Get regions in transition and their states */ - @SuppressWarnings("unchecked") - public synchronized Map getRegionsInTransition() { - return (Map)regionsInTransition.clone(); + public Map getRegionsInTransition() { + return Collections.unmodifiableMap(regionsInTransition); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final HRegionInfo hri) { + public boolean isRegionInTransition(final HRegionInfo hri) { return regionsInTransition.containsKey(hri.getEncodedName()); } /** * @return True if specified region in transition. */ - public synchronized boolean isRegionInTransition(final String encodedName) { + public boolean isRegionInTransition(final String encodedName) { return regionsInTransition.containsKey(encodedName); } /** * @return True if any region in transition. */ - public synchronized boolean isRegionsInTransition() { + public boolean isRegionsInTransition() { return !regionsInTransition.isEmpty(); } /** * @return True if specified region assigned, and not in transition. */ - public synchronized boolean isRegionOnline(final HRegionInfo hri) { - return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + public boolean isRegionOnline(final HRegionInfo hri) { + synchronized (lockCache.getLock(hri.getEncodedName())) { + return !isRegionInTransition(hri) && regionAssignments.containsKey(hri); + } } /** * @return True if specified region offline/closed, but not in transition. * If the region is not in the map, it is offline to us too. */ - public synchronized boolean isRegionOffline(final HRegionInfo hri) { - return getRegionState(hri) == null || (!isRegionInTransition(hri) - && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + public boolean isRegionOffline(final HRegionInfo hri) { + synchronized (lockCache.getLock(hri.getEncodedName())) { + return getRegionState(hri) == null + || (!isRegionInTransition(hri) && isRegionInState(hri, State.OFFLINE, State.CLOSED)); + } } /** @@ -224,7 +227,7 @@ public class RegionStates { /** * Get region transition state */ - public synchronized RegionState + public RegionState getRegionTransitionState(final String encodedName) { return regionsInTransition.get(encodedName); } @@ -256,37 +259,39 @@ public class RegionStates { * If the region is already in RegionStates, this call has * no effect, and the original state is returned. */ - public synchronized RegionState createRegionState( + public RegionState + createRegionState( final HRegionInfo hri, State newState, ServerName serverName) { - if (newState == null || (newState == State.OPEN && serverName == null)) { - newState = State.OFFLINE; - } - if (hri.isOffline() && hri.isSplit()) { - newState = State.SPLIT; - serverName = null; - } - String encodedName = hri.getEncodedName(); - RegionState regionState = regionStates.get(encodedName); - if (regionState != null) { - LOG.warn("Tried to create a state for a region already in RegionStates, " - + "used existing: " + regionState + ", ignored new: " + newState); - } else { - regionState = new RegionState(hri, newState, serverName); - regionStates.put(encodedName, regionState); - if (newState == State.OPEN) { - regionAssignments.put(hri, serverName); - lastAssignments.put(encodedName, serverName); - Set regions = serverHoldings.get(serverName); - if (regions == null) { - regions = new HashSet(); - serverHoldings.put(serverName, regions); + // TODO - Currently we lock on encodedName as most of DS's are keyed by that. + // At some time, we might need to change to regionName if we start expecting collision + synchronized (lockCache.getLock(hri.getEncodedName())) { + if (newState == null || (newState == State.OPEN && serverName == null)) { + newState = State.OFFLINE; + } + if (hri.isOffline() && hri.isSplit()) { + newState = State.SPLIT; + serverName = null; + } + String encodedName = hri.getEncodedName(); + RegionState regionState = regionStates.get(encodedName); + if (regionState != null) { + LOG.warn("Tried to create a state for a region already in RegionStates, " + + "used existing: " + regionState + ", ignored new: " + newState); + } else { + regionState = new RegionState(hri, newState, serverName); + regionStates.put(encodedName, regionState); + if (newState == State.OPEN) { + regionAssignments.put(hri, serverName); + lastAssignments.put(encodedName, serverName); + synchronized (lockCache.getLock(serverName.getServerName())) { + addToServerHoldings(serverName, hri); + } + } else if (!regionState.isUnassignable()) { + regionsInTransition.put(encodedName, regionState); } - regions.add(hri); - } else if (!regionState.isUnassignable()) { - regionsInTransition.put(encodedName, regionState); } + return regionState; } - return regionState; } /** @@ -361,47 +366,74 @@ public class RegionStates { + " was opened on a dead server: " + serverName); return; } - updateRegionState(hri, State.OPEN, serverName, openSeqNum); - - synchronized (this) { + + synchronized (lockCache.getLock(hri.getEncodedName())) { + updateRegionState(hri, State.OPEN, serverName, openSeqNum); regionsInTransition.remove(hri.getEncodedName()); ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName); - Set regions = serverHoldings.get(serverName); - if (regions == null) { - regions = new HashSet(); - serverHoldings.put(serverName, regions); + if (oldServerName == null) { + synchronized (lockCache.getLock(serverName.getServerName())) { + addToServerHoldings(serverName, hri); + } + return; } - regions.add(hri); - if (oldServerName != null) { - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - Set oldRegions = serverHoldings.get(oldServerName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(oldServerName); + ServerName first = serverName; + ServerName second = oldServerName; + // Order the locks + if (first.compareTo(second) < 0) { + first = oldServerName; + second = serverName; + } + synchronized (lockCache.getLock(first.getServerName())) { + synchronized (lockCache.getLock(second.getServerName())) { + addToServerHoldings(serverName, hri); + Set oldRegions = serverHoldings.get(oldServerName); + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + if (oldRegions != null) { + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(oldServerName); + } + } } } } } + + } + + private void addToServerHoldings(ServerName serverName, HRegionInfo hri) { + // Caller should lock this + Set regions = serverHoldings.get(serverName); + if (regions == null) { + regions = Collections.newSetFromMap(new ConcurrentHashMap()); + serverHoldings.put(serverName, regions); + } + regions.add(hri); } /** * A dead server's hlogs have been split so that all the regions * used to be open on it can be safely assigned now. Mark them assignable. */ - public synchronized void logSplit(final ServerName serverName) { + public void logSplit(final ServerName serverName) { for (Iterator> it = lastAssignments.entrySet().iterator(); it.hasNext();) { Map.Entry e = it.next(); if (e.getValue().equals(serverName)) { - it.remove(); + synchronized(lockCache.getLock(e.getKey())) { + it.remove(); + } } } long now = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { LOG.debug("Adding to processed servers " + serverName); } + // This method is called once per server, it should be ok not to use + // putIfAbsent as entries from the map are lazily removed processedServers.put(serverName, Long.valueOf(now)); Configuration conf = server.getConfiguration(); long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME); @@ -429,8 +461,10 @@ public class RegionStates { clearLastAssignment(region); } - public synchronized void clearLastAssignment(final HRegionInfo region) { - lastAssignments.remove(region.getEncodedName()); + public void clearLastAssignment(final HRegionInfo region) { + synchronized (lockCache.getLock(region.getEncodedName())) { + lastAssignments.remove(region.getEncodedName()); + } } /** @@ -457,17 +491,26 @@ public class RegionStates { } State newState = expectedState == null ? State.OFFLINE : expectedState; - updateRegionState(hri, newState); - - synchronized (this) { + String encodedName = hri.getEncodedName(); + synchronized (lockCache.getLock(encodedName)) { + updateRegionState(hri, newState); regionsInTransition.remove(hri.getEncodedName()); ServerName oldServerName = regionAssignments.remove(hri); - if (oldServerName != null && serverHoldings.containsKey(oldServerName)) { - LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - Set oldRegions = serverHoldings.get(oldServerName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(oldServerName); + if (oldServerName != null) { + // Lock is needed as isEmpty() and remove should be atomic + synchronized (lockCache.getLock(oldServerName.getServerName())) { + if (serverHoldings.containsKey(oldServerName)) { + // Offline the region only if it's merged/split, or the table is disabled/disabling. + // Otherwise, offline it from this server only when it is online on a different server. + LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); + Set oldRegions = serverHoldings.get(oldServerName); + if (oldRegions != null) { + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(oldServerName); + } + } + } } } } @@ -476,37 +519,37 @@ public class RegionStates { /** * A server is offline, all regions on it are dead. */ - public synchronized List serverOffline( + public List serverOffline( final ZooKeeperWatcher watcher, final ServerName sn) { // Offline all regions on this server not already in transition. List rits = new ArrayList(); Set assignedRegions = serverHoldings.get(sn); if (assignedRegions == null) { - assignedRegions = new HashSet(); + assignedRegions = Collections.newSetFromMap(new ConcurrentHashMap()); } // Offline regions outside the loop to avoid ConcurrentModificationException - Set regionsToOffline = new HashSet(); for (HRegionInfo region : assignedRegions) { // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE - if (isRegionOnline(region)) { - regionsToOffline.add(region); - } else { - if (isRegionInState(region, State.SPLITTING, State.MERGING)) { + synchronized (region.getEncodedName()) { + if (isRegionOnline(region)) { + regionOffline(region); + } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) { LOG.debug("Offline splitting/merging region " + getRegionState(region)); try { // Delete the ZNode if exists ZKAssign.deleteNodeFailSilent(watcher, region); - regionsToOffline.add(region); + regionOffline(region); } catch (KeeperException ke) { server.abort("Unexpected ZK exception deleting node " + region, ke); } } } } - - for (HRegionInfo hri : regionsToOffline) { - regionOffline(hri); + + Set regionsNotOffline = serverHoldings.get(sn); + if (regionsNotOffline != null) { + LOG.warn("Regions " + regionsNotOffline + " are still not yet offline on " + sn); } for (RegionState state : regionsInTransition.values()) { @@ -533,7 +576,9 @@ public class RegionStates { } } - this.notifyAll(); + synchronized (this) { + this.notifyAll(); + } return rits; } @@ -547,7 +592,7 @@ public class RegionStates { * @param tableName * @return Online regions from tableName */ - public synchronized List getRegionsOfTable(TableName tableName) { + public List getRegionsOfTable(TableName tableName) { List tableRegions = new ArrayList(); // boundary needs to have table's name but regionID 0 so that it is sorted // before all table's regions. @@ -566,7 +611,7 @@ public class RegionStates { * If the region isn't in transition, returns immediately. Otherwise, method * blocks until the region is out of transition. */ - public synchronized void waitOnRegionToClearRegionsInTransition( + public void waitOnRegionToClearRegionsInTransition( final HRegionInfo hri) throws InterruptedException { if (!isRegionInTransition(hri)) return; @@ -588,14 +633,12 @@ public class RegionStates { */ public void tableDeleted(final TableName tableName) { Set regionsToDelete = new HashSet(); - synchronized (this) { for (RegionState state: regionStates.values()) { HRegionInfo region = state.getRegion(); if (region.getTable().equals(tableName)) { regionsToDelete.add(region); } } - } for (HRegionInfo region: regionsToDelete) { deleteRegion(region); } @@ -611,12 +654,12 @@ public class RegionStates { * think it's online falsely. Therefore if a server is online, we still * need to confirm it reachable and having the expected start code. */ - synchronized boolean wasRegionOnDeadServer(final String encodedName) { + boolean wasRegionOnDeadServer(final String encodedName) { ServerName server = lastAssignments.get(encodedName); return isServerDeadAndNotProcessed(server); } - synchronized boolean isServerDeadAndNotProcessed(ServerName server) { + boolean isServerDeadAndNotProcessed(ServerName server) { if (server == null) return false; if (serverManager.isServerOnline(server)) { String hostAndPort = server.getHostAndPort(); @@ -627,6 +670,8 @@ public class RegionStates { return false; } // The size of deadServers won't grow unbounded. + // Avoid putIfAbsent as it should be ok for the startCode to be overwritten for + // a given hostAndPort deadServers.put(hostAndPort, Long.valueOf(startCode)); } // Watch out! If the server is not dead, the region could @@ -646,35 +691,36 @@ public class RegionStates { * Get the last region server a region was on for purpose of re-assignment, * i.e. should the re-assignment be held back till log split is done? */ - synchronized ServerName getLastRegionServerOfRegion(final String encodedName) { + ServerName getLastRegionServerOfRegion(final String encodedName) { return lastAssignments.get(encodedName); } - synchronized void setLastRegionServerOfRegions( - final ServerName serverName, final List regionInfos) { - for (HRegionInfo hri: regionInfos) { - setLastRegionServerOfRegion(serverName, hri.getEncodedName()); - } - } + void setLastRegionServerOfRegions(final ServerName serverName, + final List regionInfos) { + for (HRegionInfo hri : regionInfos) { + setLastRegionServerOfRegion(serverName, hri.getEncodedName()); + } + } - synchronized void setLastRegionServerOfRegion( + // Called during startup (HMaster.joinCluster()) + void setLastRegionServerOfRegion( final ServerName serverName, final String encodedName) { - lastAssignments.put(encodedName, serverName); + synchronized (lockCache.getLock(encodedName)) { + lastAssignments.put(encodedName, serverName); + } } - synchronized void closeAllUserRegions(Set excludedTables) { - Set toBeClosed = new HashSet(regionStates.size()); - for(RegionState state: regionStates.values()) { + void closeAllUserRegions(Set excludedTables) { + for (RegionState state : regionStates.values()) { HRegionInfo hri = state.getRegion(); - TableName tableName = hri.getTable(); - if (!hri.isSplit() && !hri.isMetaRegion() - && !excludedTables.contains(tableName)) { - toBeClosed.add(hri); + synchronized (hri.getEncodedName()) { + TableName tableName = hri.getTable(); + if (!hri.isSplit() && !hri.isMetaRegion() + && !excludedTables.contains(tableName)) { + updateRegionState(hri, State.CLOSED); + } } } - for (HRegionInfo hri: toBeClosed) { - updateRegionState(hri, State.CLOSED); - } } /** @@ -683,7 +729,7 @@ public class RegionStates { * regions being served, ignoring stats about number of requests. * @return the average load */ - protected synchronized double getAverageLoad() { + protected double getAverageLoad() { int numServers = 0, totalLoad = 0; for (Map.Entry> e: serverHoldings.entrySet()) { Set regions = e.getValue(); @@ -710,7 +756,6 @@ public class RegionStates { getAssignmentsByTable() { Map>> result = new HashMap>>(); - synchronized (this) { if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false)) { Map> svrToRegions = new HashMap>(serverHoldings.size()); @@ -737,7 +782,7 @@ public class RegionStates { } } } - } + Map onlineSvrs = serverManager.getOnlineServers(); @@ -756,7 +801,7 @@ public class RegionStates { return getRegionState(hri.getEncodedName()); } - protected synchronized RegionState getRegionState(final String encodedName) { + protected RegionState getRegionState(final String encodedName) { return regionStates.get(encodedName); } @@ -808,14 +853,13 @@ public class RegionStates { String encodedName = hri.getEncodedName(); RegionState regionState = new RegionState( hri, state, System.currentTimeMillis(), serverName); - RegionState oldState = getRegionState(encodedName); - if (!regionState.equals(oldState)) { - LOG.info("Transition " + oldState + " to " + regionState); - // Persist region state before updating in-memory info, if needed - regionStateStore.updateRegionState(openSeqNum, regionState, oldState); - } - - synchronized (this) { + synchronized (lockCache.getLock(encodedName)) { + RegionState oldState = getRegionState(encodedName); + if (!regionState.equals(oldState)) { + LOG.debug("Transition " + oldState + " to " + regionState); + // Persist region state before updating in-memory info, if needed + regionStateStore.updateRegionState(openSeqNum, regionState, oldState); + } regionsInTransition.put(encodedName, regionState); regionStates.put(encodedName, regionState); @@ -824,8 +868,8 @@ public class RegionStates { if ((state == State.CLOSED || state == State.MERGED || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) { ServerName last = lastAssignments.get(encodedName); - if (last.equals(serverName)) { - lastAssignments.remove(encodedName); + if (last != null && last.equals(serverName)) { + lastAssignments.remove(encodedName, last); } else { LOG.warn(encodedName + " moved to " + state + " on " + serverName + ", expected " + last); @@ -844,25 +888,34 @@ public class RegionStates { } } } - - // notify the change + } + synchronized (this) { this.notifyAll(); } + return regionState; } /** * Remove a region from all state maps. */ - private synchronized void deleteRegion(final HRegionInfo hri) { + private void deleteRegion(final HRegionInfo hri) { String encodedName = hri.getEncodedName(); - regionsInTransition.remove(encodedName); - regionStates.remove(encodedName); - lastAssignments.remove(encodedName); - ServerName sn = regionAssignments.remove(hri); - if (sn != null) { - Set regions = serverHoldings.get(sn); - regions.remove(hri); + synchronized (lockCache.getLock(encodedName)) { + regionsInTransition.remove(encodedName); + regionStates.remove(encodedName); + lastAssignments.remove(encodedName); + ServerName sn = regionAssignments.remove(hri); + if (sn != null) { + Set regions = serverHoldings.get(sn); + if (regions != null) { + regions.remove(hri); + } + } } } + + Object getLock(String name) { + return lockCache.getLock(name); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 298feb5..2a867f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -26,8 +26,17 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -65,6 +74,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -1010,7 +1021,74 @@ public class TestAssignmentManagerOnCluster { assertTrue(count == 100); rss.stop(); } + + @Test + public void testConcurrentUpdatesRegionStates() throws Exception { + String table = "testConcurrentUpdatesRegionStates"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(table)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + SplitAlgorithm algo = new RegionSplitter.HexStringSplit(); + byte[][] splits = algo.split(30); + admin.createTable(desc, splits); + RegionStates regionStates = + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); + List serverNames = + new ArrayList(TEST_UTIL.getHBaseCluster().getClusterStatus().getServers()); + int serverListSize = serverNames.size(); + + List regionInfoList = regionStates.getRegionsOfTable(TableName.valueOf(table)); + ExecutorService pool = Executors.newFixedThreadPool(100); + Set> set = new HashSet>(); + Random r = new Random(); + for (int i = 0; i < 100; i++) { + RegionStateUpdater regionStateUpdater = + new RegionStateUpdater(regionInfoList.get(r.nextInt(30)), serverNames.get(r + .nextInt(serverListSize))); + Future future = pool.submit(regionStateUpdater); + set.add(future); + } + for (Future future : set) { + try { + future.get(); + } catch (ExecutionException e) { + fail("Didn't expect exception but got " + e); + } + } + } + + private class RegionStateUpdater implements Callable { + + private HRegionInfo regionInfo; + private RegionStates regionStates; + private ServerName serverName; + RegionStateUpdater(HRegionInfo regionInfo, ServerName serverName) { + this.regionInfo = regionInfo; + this.regionStates = + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); + this.serverName = serverName; + } + + @Override + public Void call() { + regionStates.createRegionState(regionInfo, State.OFFLINE, serverName); + regionStates.clearLastAssignment(regionInfo); + regionStates.updateRegionState(regionInfo, State.OPEN); + regionStates.regionOffline(regionInfo); + regionStates.getAssignmentsByTable(); + regionStates.logSplit(serverName); + regionStates.regionOnline(regionInfo, serverName); + regionStates.getAverageLoad(); + regionStates.getRegionState(regionInfo); + regionStates.getRegionTransitionState(regionInfo); + regionStates.getRegionServerOfRegion(regionInfo); + regionStates.isServerDeadAndNotProcessed(serverName); + regionStates.setLastRegionServerOfRegion(serverName, regionInfo.getEncodedName()); + return null; + } + } + + static class MyLoadBalancer extends StochasticLoadBalancer { // For this region, if specified, always assign to nowhere static volatile String controledRegion = null; @@ -1072,8 +1150,7 @@ public class TestAssignmentManagerOnCluster { return getServerName().equals(abortedServer) || super.isAborted(); } } - - + public static class MyRegionObserver extends BaseRegionObserver { // If enabled, fail all preClose calls static AtomicBoolean preCloseEnabled = new AtomicBoolean(false);