From f11ba8853c34dd87f9d1f08e32821a4b33edfdc2 Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Tue, 19 Apr 2016 01:35:15 -0700 Subject: [PATCH] Reduce number of concurrent region location lookups when MetaCache entry is cleared --- .../hadoop/hbase/client/ConnectionManager.java | 211 +++++++++++---------- .../org/apache/hadoop/hbase/client/MetaCache.java | 28 +++ .../org/apache/hadoop/hbase/client/MultiLock.java | 130 +++++++++++++ 3 files changed, 272 insertions(+), 97 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiLock.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 15e0a39..eb05458 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -27,6 +27,7 @@ import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashMap; @@ -1221,6 +1222,14 @@ class ConnectionManager { /* * Search the hbase:meta table for the HRegionLocation * info that contains the table and row we're seeking. + * + * To prevent double work, only one client thread will be looking up a given region + * at a time, using locking schema very similar to what HFileReaderV2#offsetLock use. + * + * - first try to get cached location; if found, return + * - grab lock for the regionName + * - try cached location again in case someone who held the lock retrieved it for us + * - if still no luck, do a meta scan */ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { @@ -1250,121 +1259,129 @@ class ConnectionManager { int localNumRetries = (retry ? numTries : 1); - for (int tries = 0; true; tries++) { - if (tries >= localNumRetries) { - throw new NoServerForRegionException("Unable to find region for " + MultiLock.Lock> rlLock = metaCache.getRegionLookupLock( + Arrays.asList(org.apache.commons.lang.ArrayUtils.toObject(metaKey))); + try { + for (int tries = 0; true; tries++) { + if (tries >= localNumRetries) { + throw new NoServerForRegionException("Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName + " after " + localNumRetries + " tries."); - } - if (useCache) { - RegionLocations locations = getCachedLocation(tableName, row); - if (locations != null && locations.getRegionLocation(replicaId) != null) { - return locations; } - } else { - // If we are not supposed to be using the cache, delete any existing cached location - // so it won't interfere. - metaCache.clearCache(tableName, row); - } - // Query the meta region - try { - Result regionInfoRow = null; - ReversedClientScanner rcs = null; - try { - rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, - rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0); - regionInfoRow = rcs.next(); - } finally { - if (rcs != null) { - rcs.close(); + if (useCache) { + RegionLocations locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } + } else { + // If we are not supposed to be using the cache, delete any existing cached location + // so it won't interfere. + metaCache.clearCache(tableName, row); } - if (regionInfoRow == null) { - throw new TableNotFoundException(tableName); - } + // Query the meta region + try { + Result regionInfoRow = null; + ReversedClientScanner rcs = null; + try { + rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, + rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0); + regionInfoRow = rcs.next(); + } finally { + if (rcs != null) { + rcs.close(); + } + } - // convert the row result into the HRegionLocation we need! - RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); - if (locations == null || locations.getRegionLocation(replicaId) == null) { - throw new IOException("HRegionInfo was null in " + - tableName + ", row=" + regionInfoRow); - } - HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); - if (regionInfo == null) { - throw new IOException("HRegionInfo was null or empty in " + - TableName.META_TABLE_NAME + ", row=" + regionInfoRow); - } + if (regionInfoRow == null) { + throw new TableNotFoundException(tableName); + } - // possible we got a region of a different table... - if (!regionInfo.getTable().equals(tableName)) { - throw new TableNotFoundException( - "Table '" + tableName + "' was not found, got: " + + // convert the row result into the HRegionLocation we need! + RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); + if (locations == null || locations.getRegionLocation(replicaId) == null) { + throw new IOException("HRegionInfo was null in " + + tableName + ", row=" + regionInfoRow); + } + HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); + if (regionInfo == null) { + throw new IOException("HRegionInfo was null or empty in " + + TableName.META_TABLE_NAME + ", row=" + regionInfoRow); + } + + // possible we got a region of a different table... + if (!regionInfo.getTable().equals(tableName)) { + throw new TableNotFoundException( + "Table '" + tableName + "' was not found, got: " + regionInfo.getTable() + "."); - } - if (regionInfo.isSplit()) { - throw new RegionOfflineException("the only available region for" + - " the required row is a split parent," + - " the daughters should be online soon: " + - regionInfo.getRegionNameAsString()); - } - if (regionInfo.isOffline()) { - throw new RegionOfflineException("the region is offline, could" + - " be caused by a disable table call: " + - regionInfo.getRegionNameAsString()); - } + } + if (regionInfo.isSplit()) { + throw new RegionOfflineException("the only available region for" + + " the required row is a split parent," + + " the daughters should be online soon: " + + regionInfo.getRegionNameAsString()); + } + if (regionInfo.isOffline()) { + throw new RegionOfflineException("the region is offline, could" + + " be caused by a disable table call: " + + regionInfo.getRegionNameAsString()); + } - ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); - if (serverName == null) { - throw new NoServerForRegionException("No server address listed " + - "in " + TableName.META_TABLE_NAME + " for region " + - regionInfo.getRegionNameAsString() + " containing row " + - Bytes.toStringBinary(row)); - } + ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); + if (serverName == null) { + throw new NoServerForRegionException("No server address listed " + + "in " + TableName.META_TABLE_NAME + " for region " + + regionInfo.getRegionNameAsString() + " containing row " + + Bytes.toStringBinary(row)); + } - if (isDeadServer(serverName)){ - throw new RegionServerStoppedException("hbase:meta says the region "+ - regionInfo.getRegionNameAsString()+" is managed by the server " + serverName + + if (isDeadServer(serverName)) { + throw new RegionServerStoppedException("hbase:meta says the region " + + regionInfo.getRegionNameAsString() + " is managed by the server " + serverName + ", but it is dead."); - } - // Instantiate the location - cacheLocation(tableName, locations); - return locations; - } catch (TableNotFoundException e) { - // if we got this error, probably means the table just plain doesn't - // exist. rethrow the error immediately. this should always be coming - // from the HTable constructor. - throw e; - } catch (IOException e) { - ExceptionUtil.rethrowIfInterrupt(e); + } + // Instantiate the location + cacheLocation(tableName, locations); + return locations; + } catch (TableNotFoundException e) { + // if we got this error, probably means the table just plain doesn't + // exist. rethrow the error immediately. this should always be coming + // from the HTable constructor. + throw e; + } catch (IOException e) { + ExceptionUtil.rethrowIfInterrupt(e); - if (e instanceof RemoteException) { - e = ((RemoteException)e).unwrapRemoteException(); - } - if (tries < localNumRetries - 1) { - if (LOG.isDebugEnabled()) { - LOG.debug("locateRegionInMeta parentTable=" + + if (e instanceof RemoteException) { + e = ((RemoteException) e).unwrapRemoteException(); + } + if (tries < localNumRetries - 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + - ", attempt=" + tries + " of " + - localNumRetries + " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + ", attempt=" + tries + " of " + + localNumRetries + " failed; retrying after sleep of " + + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + } + } else { + throw e; } - } else { - throw e; - } - // Only relocate the parent region if necessary - if(!(e instanceof RegionOfflineException || + // Only relocate the parent region if necessary + if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { - relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId); + relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId); + } + } + + try { + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + } catch (InterruptedException e) { + throw new InterruptedIOException("Giving up trying to location region in " + + "meta: thread is interrupted."); } } - try{ - Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); - } catch (InterruptedException e) { - throw new InterruptedIOException("Giving up trying to location region in " + - "meta: thread is interrupted."); - } + } finally { + metaCache.releaseRegionLookupLock(rlLock); } } @@ -2649,7 +2666,7 @@ class ConnectionManager { public boolean hasCellBlockSupport() { return this.rpcClient.hasCellBlockSupport(); } - + @Override public ConnectionConfiguration getConnectionConfiguration() { return this.connectionConfig; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index a1ff3d3..bc26647 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -60,6 +62,9 @@ public class MetaCache { // The access to this attribute must be protected by a lock on cachedRegionLocations private final Set cachedServers = new CopyOnWriteArraySet<>(); + // List rather than array so we can use it as a key in a map + private final MultiLock> regionLookupsLock= new MultiLock<>(); + private final MetricsConnection metrics; public MetaCache(MetricsConnection metrics) { @@ -405,4 +410,27 @@ public class MetaCache { } } } + + /** + * Acquires named lock for a region. Used to prevent simultaneous lookups + * of the same regions by multiple threads at the same time. + * @param regionName region name to get lock on + * + * @return Lock object which caller will need later to release the lock + * @throws IOException + */ + public MultiLock.Lock> getRegionLookupLock(List regionName) + throws IOException { + return regionLookupsLock.lock(regionName); + + } + + /** + * + * Release named lock for a region. Pass + * @param regionLock lock you received from getRegionLookupLock() method. + */ + public void releaseRegionLookupLock(MultiLock.Lock> regionLock) { + regionLookupsLock.unlock(regionLock); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiLock.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiLock.java new file mode 100644 index 0000000..6c0b5dd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiLock.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Very similar to org.apache.hadoop.hbase.util.IdLock used by + * HFileReaderV2##offsetLock, but: + * + * - allows to lock on arbitrary objects (objects are used as keys in a map, + * so should support proper equality semantics) + * - re-entrant + * + * TODO: refactor both into single class? + */ +@InterfaceAudience.Private +public class MultiLock { + + /** + * Lock returned to the client (use this instance to release acquired lock). + */ + public static class Lock { + private final E lockObject; + private int numWaiters; + private volatile boolean isLocked = true; + private volatile long lockedByThreadId; + private ReentrantLock entryOperationsLock; + private Condition isLockedCondition; + + private Lock(E lockObject) { + this.lockObject = lockObject; + this.entryOperationsLock = new ReentrantLock(); + this.isLockedCondition = entryOperationsLock.newCondition(); + this.lockedByThreadId = Thread.currentThread().getId(); + } + + public String toString() { + return "lockObject=" + lockObject + ", numWaiter=" + numWaiters + ", isLocked=" + + isLocked; + } + } + + private ConcurrentMap map = new ConcurrentHashMap<>(); + + /** + * Blocks until the Lock corresponding to given lockObject is acquired. Re-entrant. + * + * @param lockObject an arbitrary object to lock on + * @return Lock object for client to pass to {@link #unlock(Lock)} to release the lockObject + * @throws IOException if interrupted + */ + public Lock lock(E lockObject) throws IOException { + Lock entry = new Lock<>(lockObject); + Lock existing; + while ((existing = map.putIfAbsent(entry.lockObject, entry)) != null) { + existing.entryOperationsLock.lock(); + try { + if (existing.isLocked && existing.lockedByThreadId != Thread.currentThread().getId()) { + ++existing.numWaiters; // Add ourselves to waiters. + while (existing.isLocked && + existing.lockedByThreadId != Thread.currentThread().getId()) { + try { + existing.isLockedCondition.await(); + } catch (InterruptedException e) { + --existing.numWaiters; // Remove ourselves from waiters. + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lockObject"); + } + } + + --existing.numWaiters; // Remove ourselves from waiters. + existing.isLocked = true; + existing.lockedByThreadId = Thread.currentThread().getId(); + return existing; + } + // If the entry is not locked, it might already be deleted from the + // map, so we cannot return it. We need to get our entry into the map + // or get someone else's locked entry. + } finally { + existing.entryOperationsLock.unlock(); + } + } + return entry; + } + + /** + * Must be called in a finally block to decrease the internal counter and + * remove the monitor object for the given id if the caller is the last + * client. + * + * @param lock the return value of {@link #lock(E)} + */ + public void unlock(Lock lock) { + lock.entryOperationsLock.lock(); + try { + lock.isLocked = false; + if (lock.numWaiters > 0) { + lock.isLockedCondition.signalAll(); + } else { + map.remove(lock.lockObject); + } + } finally { + lock.entryOperationsLock.unlock(); + } + } +} -- 2.8.0-rc2