diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index dfeb9ef..6394d66 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -561,7 +561,7 @@ public class HRegionInfo implements Comparable { break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if(offset == -1) throw new IOException("Invalid regionName format: " + Bytes.toString(regionName)); byte[] tableName = new byte[offset]; System.arraycopy(regionName, 0, tableName, 0, offset); offset = -1; @@ -590,7 +590,7 @@ public class HRegionInfo implements Comparable { break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if(offset == -1) throw new IOException("Invalid regionName format: " + Bytes.toString(regionName)); byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; if(offset != tableName.length + 1) { startKey = new byte[offset - tableName.length - 1]; diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java index b5db549..85eacd3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase; import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.util.Bytes; /** @@ -127,23 +126,45 @@ public class RegionLocations { * if nothing is removed */ public RegionLocations remove(HRegionLocation location) { - HRegionLocation[] newLocations = null; - for (int i = 0; i < locations.length; i++) { - // check whether something to remove. HRL.compareTo() compares ONLY the - // serverName. We want to compare the HRI's as well. - if (locations[i] != null - && location.getRegionInfo().equals(locations[i].getRegionInfo()) - && location.equals(locations[i])) { - if (newLocations == null) { //first time - newLocations = new HRegionLocation[locations.length]; - System.arraycopy(locations, 0, newLocations, 0, i); - } - newLocations[i] = null; - } else if (newLocations != null) { - newLocations[i] = locations[i]; - } + if (location == null) return this; + if (location.getRegionInfo() == null) return this; + int replicaId = location.getRegionInfo().getReplicaId(); + if (replicaId >= locations.length) return this; + + // check whether something to remove. HRL.compareTo() compares ONLY the + // serverName. We want to compare the HRI's as well. + if (locations[replicaId] == null + || !location.getRegionInfo().equals(locations[replicaId].getRegionInfo()) + || !location.equals(locations[replicaId])) { + return this; } - return newLocations == null ? this : new RegionLocations(newLocations); + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + System.arraycopy(locations, 0, newLocations, 0, locations.length); + newLocations[replicaId] = null; + return new RegionLocations(newLocations); + } + + /** + * Removes location of the given replicaId from the list + * @param replicaId the replicaId of the location to remove + * @return an HRegionLocationList object with removed locations or the same object + * if nothing is removed + */ + public RegionLocations remove(int replicaId) { + if (getRegionLocation(replicaId) == null) { + return this; + } + + int newLength = replicaId == locations.length-1 ? locations.length-1 : locations.length; + HRegionLocation[] newLocations = new HRegionLocation[newLength]; + + System.arraycopy(locations, 0, newLocations, 0, newLength); + if (replicaId < newLocations.length) { + newLocations[replicaId] = null; + } + + return new RegionLocations(newLocations); } /** diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java index fc30cbc..6d3e400 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java @@ -237,7 +237,7 @@ public class MetaReader { parsedInfo = parseRegionInfoFromRegionName(regionName); row = getMetaKeyForRegion(parsedInfo); } catch (Exception parseEx) { - LOG.warn("Received parse exception:" + parseEx); + // Ignore. This is used with tableName passed as regionName. } Get get = new Get(row); get.addFamily(HConstants.CATALOG_FAMILY); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index f291241..a476e8d 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; interface ClusterConnection extends HConnection { /** @return - true if the master server is running */ + @Override boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException; @@ -53,9 +54,10 @@ interface ClusterConnection extends HConnection { * @throws IOException * if a remote or network exception occurs */ + @Override boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException; - + /** * Find the location of the region of tableName that row * lives in. @@ -65,12 +67,14 @@ interface ClusterConnection extends HConnection { * question * @throws IOException if a remote or network exception occurs */ + @Override public HRegionLocation locateRegion(final TableName tableName, final byte [] row) throws IOException; /** * Allows flushing the region cache. */ + @Override void clearRegionCache(); /** @@ -79,12 +83,14 @@ interface ClusterConnection extends HConnection { * @param tableName Name of the table whose regions we are to remove from * cache. */ + @Override void clearRegionCache(final TableName tableName); /** * Deletes cached locations for the specific region. * @param location The location object for the region, to be purged from cache. */ + @Override void deleteCachedRegionLocation(final HRegionLocation location); /** @@ -96,10 +102,24 @@ interface ClusterConnection extends HConnection { * question * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException; /** + * Find the location of the region of tableName that row + * lives in, ignoring any value that might be in the cache. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @param replicaId the replicaId of the region + * @return HRegionLocation that describes where to find the region in + * question + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException; + + /** * Update the location cache. This is used internally by HBase, in most cases it should not be * used by the client application. * @param tableName the table name @@ -119,6 +139,7 @@ interface ClusterConnection extends HConnection { * question * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation locateRegion(final byte[] regionName) throws IOException; @@ -128,6 +149,7 @@ interface ClusterConnection extends HConnection { * @return list of region locations for all regions of table * @throws IOException */ + @Override List locateRegions(final TableName tableName) throws IOException; /** @@ -139,6 +161,7 @@ interface ClusterConnection extends HConnection { * @return list of region locations for all regions of table * @throws IOException */ + @Override List locateRegions(final TableName tableName, final boolean useCache, final boolean offlined) throws IOException; @@ -154,9 +177,24 @@ interface ClusterConnection extends HConnection { */ RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry) throws IOException; + + /** + * + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @param replicaId the replicaId for the region + * @return region locations for this row. + * @throws IOException + */ + RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException; + /** * Returns a {@link MasterKeepAliveConnection} to the active master */ + @Override MasterService.BlockingInterface getMaster() throws IOException; @@ -166,6 +204,7 @@ interface ClusterConnection extends HConnection { * @return proxy for HRegionServer * @throws IOException if a remote or network exception occurs */ + @Override AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException; /** @@ -177,6 +216,7 @@ interface ClusterConnection extends HConnection { * @throws IOException if a remote or network exception occurs * */ + @Override ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** @@ -187,6 +227,7 @@ interface ClusterConnection extends HConnection { * @return Location of row. * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation getRegionLocation(TableName tableName, byte [] row, boolean reload) throws IOException; @@ -195,6 +236,7 @@ interface ClusterConnection extends HConnection { * Clear any caches that pertain to server name sn. * @param sn A server name */ + @Override void clearCaches(final ServerName sn); /** @@ -203,6 +245,7 @@ interface ClusterConnection extends HConnection { * @return The shared instance. Never returns null. * @throws MasterNotRunningException */ + @Override @Deprecated MasterKeepAliveConnection getKeepAliveMasterService() throws MasterNotRunningException; @@ -211,12 +254,14 @@ interface ClusterConnection extends HConnection { * @param serverName * @return true if the server is known as dead, false otherwise. * @deprecated internal method, do not use thru HConnection */ + @Override @Deprecated boolean isDeadServer(ServerName serverName); /** * @return Nonce generator for this HConnection; may be null if disabled in configuration. */ + @Override public NonceGenerator getNonceGenerator(); /** @@ -228,4 +273,5 @@ interface ClusterConnection extends HConnection { * @return All locations for a particular region. */ RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException; + } \ No newline at end of file diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index f54c650..a7a3301 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -35,9 +35,6 @@ import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -48,7 +45,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -79,7 +75,87 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -354,6 +430,7 @@ class ConnectionManager { * @param conf configuration whose identity is used to find {@link HConnection} instance. * @deprecated */ + @Deprecated public static void deleteConnection(Configuration conf) { deleteConnection(new HConnectionKey(conf), false); } @@ -365,6 +442,7 @@ class ConnectionManager { * @param connection * @deprecated */ + @Deprecated public static void deleteStaleConnection(HConnection connection) { deleteConnection(connection, true); } @@ -375,6 +453,7 @@ class ConnectionManager { * staleConnection to true. * @deprecated */ + @Deprecated public static void deleteAllConnections(boolean staleConnection) { synchronized (CONNECTION_INSTANCES) { Set connectionKeys = new HashSet(); @@ -994,6 +1073,12 @@ class ConnectionManager { @Override public HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException{ + return relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException{ // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when // the first time a disabled table is interacted with. @@ -1001,8 +1086,8 @@ class ConnectionManager { throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); } - RegionLocations locations = locateRegion(tableName, row, false, true); - return locations == null ? null : locations.getRegionLocation(); + RegionLocations locations = locateRegion(tableName, row, false, true, replicaId); + return locations == null ? null : locations.getRegionLocation(replicaId); } @Override @@ -1011,11 +1096,17 @@ class ConnectionManager { return relocateRegion(TableName.valueOf(tableName), row); } - @Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { + return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public RegionLocations locateRegion(final TableName tableName, + final byte [] row, boolean useCache, boolean retry, int replicaId) + throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( @@ -1027,7 +1118,7 @@ class ConnectionManager { } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row, - useCache, userRegionLock, retry); + useCache, userRegionLock, retry, replicaId); } } @@ -1091,15 +1182,15 @@ class ConnectionManager { */ private RegionLocations locateRegionInMeta(final TableName parentTable, final TableName tableName, final byte [] row, boolean useCache, - Object regionLockObject, boolean retry) + Object regionLockObject, boolean retry, int replicaId) throws IOException { - RegionLocations location; + RegionLocations locations; // If we are supposed to be using the cache, look in the cache to see if // we already have the region. if (useCache) { - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null) { + return locations; } } int localNumRetries = retry ? numTries : 1; @@ -1118,7 +1209,7 @@ class ConnectionManager { try { // locate the meta region RegionLocations metaLocations = locateRegion(parentTable, metaKey, true, false); - metaLocation = metaLocations == null ? null : metaLocations.getRegionLocation(); + metaLocation = metaLocations == null ? null : metaLocations.getDefaultRegionLocation(); // If null still, go around again. if (metaLocation == null) continue; ClientService.BlockingInterface service = getClient(metaLocation.getServerName()); @@ -1133,23 +1224,23 @@ class ConnectionManager { synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null) { + return locations; } // If the parent table is META, we may want to pre-fetch some // region info into the global region cache for this table. prefetchRegionCache(tableName, row); } } - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null) { + return locations; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. - metaCache.clearCache(tableName, row); + metaCache.clearCache(tableName, row, replicaId); } // Query the meta region for the location of the meta region @@ -1162,12 +1253,12 @@ class ConnectionManager { } // convert the row result into the HRegionLocation we need! - location = MetaReader.getRegionLocations(regionInfoRow); - if (location == null || location.getRegionLocation() == null) { + locations = MetaReader.getRegionLocations(regionInfoRow); + if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + parentTable + ", row=" + regionInfoRow); } - HRegionInfo regionInfo = location.getRegionLocation().getRegionInfo(); + HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + parentTable + ", row=" + regionInfoRow); @@ -1191,7 +1282,7 @@ class ConnectionManager { regionInfo.getRegionNameAsString()); } - ServerName serverName = location.getRegionLocation().getServerName(); + ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + parentTable + " for region " + @@ -1205,8 +1296,8 @@ class ConnectionManager { ", but it is dead."); } - cacheLocation(tableName, location); - return location; + cacheLocation(tableName, locations); + return locations; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming @@ -1233,7 +1324,7 @@ class ConnectionManager { // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { - relocateRegion(parentTable, metaKey); + relocateRegion(parentTable, metaKey, replicaId); } } try{ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 465ff6c..eb83ca0 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; /** * A cache implementation for region locations from meta. @@ -114,6 +115,9 @@ public class MetaCache { RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocations == null); if (isNewCacheEntry) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cached location: " + location); //TODO: change to trace + } addToCachedServers(locations); return; } @@ -131,7 +135,10 @@ public class MetaCache { // an additional counter on top of seqNum would be necessary to handle them all. RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force); if (oldLocations != updatedLocations) { - tableLocations.replace(startKey, oldLocations, updatedLocations); + boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations); + if (replaced && LOG.isDebugEnabled()) { + LOG.debug("Changed cached location to: " + location); //TODO: change to trace + } addToCachedServers(updatedLocations); } } @@ -139,24 +146,30 @@ public class MetaCache { /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. - * @param location the new location + * @param locations the new locations */ - public void cacheLocation(final TableName tableName, final RegionLocations location) { - byte [] startKey = location.getRegionLocation().getRegionInfo().getStartKey(); + public void cacheLocation(final TableName tableName, final RegionLocations locations) { + byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, location); + RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocation == null); if (isNewCacheEntry) { - addToCachedServers(location); + if (LOG.isDebugEnabled()) { + LOG.debug("Cached location: " + locations); //TODO: change to trace + } + addToCachedServers(locations); return; } // merge old and new locations and add it to the cache // Meta record might be stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. - RegionLocations mergedLocation = oldLocation.mergeLocations(location); - tableLocations.replace(startKey, oldLocation, mergedLocation); - addToCachedServers(location); + RegionLocations mergedLocation = oldLocation.mergeLocations(locations); + boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation); + if (replaced && LOG.isDebugEnabled()) { + LOG.debug("Merged cached locations: " + mergedLocation); //TODO: change to trace + } + addToCachedServers(locations); } private void addToCachedServers(RegionLocations locations) { @@ -245,12 +258,11 @@ public class MetaCache { RegionLocations regionLocations = e.getValue(); if (regionLocations != null) { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); - deletedSomething |= regionLocations == updatedLocations; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(e.getKey(), regionLocations); + deletedSomething = tableLocations.remove(e.getKey(), regionLocations); } else { - tableLocations.replace(e.getKey(), regionLocations, updatedLocations); + deletedSomething = tableLocations.replace(e.getKey(), regionLocations, updatedLocations); } } } @@ -267,6 +279,9 @@ public class MetaCache { * Delete all cached entries of a table. */ public void clearCache(final TableName tableName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removed all cached region locations for table " + tableName); + } this.cachedRegionLocations.remove(tableName); } @@ -275,6 +290,35 @@ public class MetaCache { * @param tableName tableName * @param row */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + boolean removed = false; + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + RegionLocations updatedLocations = regionLocations.remove(replicaId); + if (updatedLocations != regionLocations) { + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + } + + if (removed && LOG.isDebugEnabled() && toBeRemoved != null) { + LOG.debug("Removed " + toBeRemoved + " from cache"); + LOG.debug(StringUtils.stringifyException(new Exception("debug"))); + } + } + } + + /** + * Delete a cached location, no matter what it is. Called when we were told to not use cache. + * @param tableName tableName + * @param row + */ public void clearCache(final TableName tableName, final byte [] row) { ConcurrentMap tableLocations = getTableLocations(tableName); @@ -284,6 +328,7 @@ public class MetaCache { boolean removed = tableLocations.remove(startKey, regionLocations); if (removed && LOG.isDebugEnabled()) { LOG.debug("Removed " + regionLocations + " from cache"); + LOG.debug(StringUtils.stringifyException(new Exception("debug"))); } } } @@ -299,10 +344,15 @@ public class MetaCache { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); if (updatedLocations != regionLocations) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed = false; if (updatedLocations.isEmpty()) { - tableLocations.remove(startKey, regionLocations); + removed = tableLocations.remove(startKey, regionLocations); } else { - tableLocations.replace(startKey, regionLocations, updatedLocations); + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + if (removed && LOG.isDebugEnabled()) { + LOG.debug("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) + + " mapping to server: " + serverName + " from cache"); } } } @@ -317,12 +367,17 @@ public class MetaCache { RegionLocations regionLocations = tableLocations.get(hri.getStartKey()); if (regionLocations != null) { HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId()); + if (oldLocation == null) return; RegionLocations updatedLocations = regionLocations.remove(oldLocation); + boolean removed = false; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(hri.getStartKey(), regionLocations); + removed = tableLocations.remove(hri.getStartKey(), regionLocations); } else { - tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isDebugEnabled()) { + LOG.debug("Removed " + oldLocation + " from cache"); } } } @@ -332,22 +387,22 @@ public class MetaCache { if (location == null) { return; } - TableName tableName = location.getRegionInfo().getTable(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations rll = tableLocations.get(location.getRegionInfo().getStartKey()); - if (rll == null) { - return; - } - RegionLocations updatedLocations = rll.remove(location); - if (updatedLocations.isEmpty()) { - tableLocations.remove(location.getRegionInfo().getStartKey(), rll); - } - if (LOG.isDebugEnabled() && (rll == updatedLocations)) { - LOG.debug("Removed " + - location.getRegionInfo().getRegionNameAsString() + - " for tableName=" + tableName + - " from cache"); + RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey()); + if (regionLocations != null) { + RegionLocations updatedLocations = regionLocations.remove(location); + boolean removed = false; + if (updatedLocations != regionLocations) { + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations); + } else { + removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isDebugEnabled()) { + LOG.debug("Removed " + location + " from cache"); + } + } } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 87d74cf..5e97d9cf 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -21,6 +21,17 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,17 +45,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.BoundedCompletionService; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - /** * Caller that goes to replica if the primary region does no answer within a configurable * timeout. If the timeout is reached, it calls all the secondary replicas, and returns @@ -104,7 +104,7 @@ public class RpcRetryingCallerWithReadReplicas { } if (reload || location == null) { - RegionLocations rl = getRegionLocations(false); + RegionLocations rl = getRegionLocations(false, id); location = id < rl.size() ? rl.getRegionLocation(id) : null; } @@ -169,7 +169,7 @@ public class RpcRetryingCallerWithReadReplicas { */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true); + RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID); BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size()); addCallsForReplica(cs, rl, 0, 0); // primary. @@ -240,19 +240,19 @@ public class RpcRetryingCallerWithReadReplicas { } } - private RegionLocations getRegionLocations(boolean useCache) - throws RetriesExhaustedException, DoNotRetryIOException { + private RegionLocations getRegionLocations(boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { RegionLocations rl; try { - rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true); + rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; } catch (IOException e) { - if (e instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException) e; - } else if (e instanceof RetriesExhaustedException) { - throw (RetriesExhaustedException) e; - } else { - throw new RetriesExhaustedException("Can't get the location", e); - } + throw new RetriesExhaustedException("Can't get the location", e); } if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 0b06a4e..f31bce5 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * Thread Utility @@ -160,15 +161,15 @@ public class Threads { } /** - * Create a new CachedThreadPool with a bounded number as the maximum + * Create a new CachedThreadPool with a bounded number as the maximum * thread size in the pool. - * + * * @param maxCachedThread the maximum thread could be created in the pool * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @param threadFactory the factory to use when creating new threads - * @return threadPoolExecutor the cachedThreadPool with a bounded number - * as the maximum thread size in the pool. + * @return threadPoolExecutor the cachedThreadPool with a bounded number + * as the maximum thread size in the pool. */ public static ThreadPoolExecutor getBoundedCachedThreadPool( int maxCachedThread, long timeout, TimeUnit unit, @@ -180,8 +181,8 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - + + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. @@ -230,6 +231,14 @@ public class Threads { Thread t = namedFactory.newThread(r); if (handler != null) { t.setUncaughtExceptionHandler(handler); + } else { + t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Thread:" + t + " exited with Exception:" + + StringUtils.stringifyException(e)); + } + }); } if (!t.isDaemon()) { t.setDaemon(true); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index 78441c2..dac472b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -274,6 +274,12 @@ class CoprocessorHConnection implements ClusterConnection { } @Override + public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId) + throws IOException { + return delegate.relocateRegion(tableName, row, replicaId); + } + + @Override public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException { return delegate.relocateRegion(tableName, row); } @@ -331,6 +337,12 @@ class CoprocessorHConnection implements ClusterConnection { } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry, int replicaId) throws IOException { + return delegate.locateRegion(tableName, row, useCache, retry, replicaId); + } + + @Override public List locateRegions(byte[] tableName, boolean useCache, boolean offlined) throws IOException { return delegate.locateRegions(tableName, useCache, offlined);