From ad05de172f4df735c56f83b0d590724603b3c2e9 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Tue, 8 Apr 2014 15:48:17 +0000 Subject: [PATCH 29/45] HBASE-10701 Cache invalidation improvements from client side git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1585766 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/hadoop/hbase/HRegionInfo.java | 8 +- .../org/apache/hadoop/hbase/RegionLocations.java | 100 +++++++++++------ .../apache/hadoop/hbase/catalog/MetaReader.java | 2 +- .../hadoop/hbase/client/ClusterConnection.java | 28 +++++ .../hadoop/hbase/client/ConnectionAdapter.java | 12 ++ .../hadoop/hbase/client/ConnectionManager.java | 124 ++++++++++++++++++--- .../org/apache/hadoop/hbase/client/MetaCache.java | 120 ++++++++++++++------ .../client/RpcRetryingCallerWithReadReplicas.java | 104 +++++++++++------ .../apache/hadoop/hbase/TestRegionLocations.java | 52 +++++++-- .../hbase/util/BoundedCompletionService.java | 12 +- .../java/org/apache/hadoop/hbase/util/Threads.java | 32 +++++- .../hbase/client/CoprocessorHConnection.java | 12 ++ .../hbase/client/TestReplicaWithCluster.java | 4 +- .../hadoop/hbase/util/MultiThreadedReader.java | 4 +- .../hadoop/hbase/util/MultiThreadedWriter.java | 2 + .../hadoop/hbase/util/MultiThreadedWriterBase.java | 4 +- 16 files changed, 475 insertions(+), 145 deletions(-) diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 59a3248..3e5224b 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -561,7 +561,9 @@ public class HRegionInfo implements Comparable { break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if (offset == -1) { + throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName)); + } byte[] tableName = new byte[offset]; System.arraycopy(regionName, 0, tableName, 0, offset); offset = -1; @@ -590,7 +592,9 @@ public class HRegionInfo implements Comparable { break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if (offset == -1) { + throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName)); + } byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; if(offset != tableName.length + 1) { startKey = new byte[offset - tableName.length - 1]; diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java index b5db549..8b77af1 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase; import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.util.Bytes; /** @@ -34,27 +33,42 @@ import org.apache.hadoop.hbase.util.Bytes; public class RegionLocations { private final int numNonNullElements; + + // locations array contains the HRL objects for known region replicas indexes by the replicaId. + // elements can be null if the region replica is not known at all. A null value indicates + // that there is a region replica with the index as replicaId, but the location is not known + // in the cache. private final HRegionLocation[] locations; // replicaId -> HRegionLocation. /** * Constructs the region location list. The locations array should * contain all the locations for known replicas for the region, and should be - * sorted in replicaId ascending order. + * sorted in replicaId ascending order, although it can contain nulls indicating replicaIds + * that the locations of which are not known. * @param locations an array of HRegionLocations for the same region range */ public RegionLocations(HRegionLocation... locations) { int numNonNullElements = 0; int maxReplicaId = -1; + int maxReplicaIdIndex = -1; + int index = 0; for (HRegionLocation loc : locations) { if (loc != null) { - numNonNullElements++; - if (loc.getRegionInfo().getReplicaId() > maxReplicaId) { + if (loc.getServerName() != null) { + numNonNullElements++; + } + if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) { maxReplicaId = loc.getRegionInfo().getReplicaId(); + maxReplicaIdIndex = index; } } + index++; } this.numNonNullElements = numNonNullElements; + // account for the null elements in the array after maxReplicaIdIndex + maxReplicaId = maxReplicaId + (locations.length - (maxReplicaIdIndex + 1) ); + if (maxReplicaId + 1 == locations.length) { this.locations = locations; } else { @@ -97,10 +111,10 @@ public class RegionLocations { } /** - * Returns a new HRegionLocationList with the locations removed (set to null) + * Returns a new RegionLocations with the locations removed (set to null) * which have the destination server as given. * @param serverName the serverName to remove locations of - * @return an HRegionLocationList object with removed locations or the same object + * @return an RegionLocations object with removed locations or the same object * if nothing is removed */ public RegionLocations removeByServer(ServerName serverName) { @@ -123,36 +137,58 @@ public class RegionLocations { /** * Removes the given location from the list * @param location the location to remove - * @return an HRegionLocationList object with removed locations or the same object + * @return an RegionLocations object with removed locations or the same object * if nothing is removed */ public RegionLocations remove(HRegionLocation location) { - HRegionLocation[] newLocations = null; - for (int i = 0; i < locations.length; i++) { - // check whether something to remove. HRL.compareTo() compares ONLY the - // serverName. We want to compare the HRI's as well. - if (locations[i] != null - && location.getRegionInfo().equals(locations[i].getRegionInfo()) - && location.equals(locations[i])) { - if (newLocations == null) { //first time - newLocations = new HRegionLocation[locations.length]; - System.arraycopy(locations, 0, newLocations, 0, i); - } - newLocations[i] = null; - } else if (newLocations != null) { - newLocations[i] = locations[i]; - } + if (location == null) return this; + if (location.getRegionInfo() == null) return this; + int replicaId = location.getRegionInfo().getReplicaId(); + if (replicaId >= locations.length) return this; + + // check whether something to remove. HRL.compareTo() compares ONLY the + // serverName. We want to compare the HRI's as well. + if (locations[replicaId] == null + || !location.getRegionInfo().equals(locations[replicaId].getRegionInfo()) + || !location.equals(locations[replicaId])) { + return this; } - return newLocations == null ? this : new RegionLocations(newLocations); + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + System.arraycopy(locations, 0, newLocations, 0, locations.length); + newLocations[replicaId] = null; + + return new RegionLocations(newLocations); + } + + /** + * Removes location of the given replicaId from the list + * @param replicaId the replicaId of the location to remove + * @return an RegionLocations object with removed locations or the same object + * if nothing is removed + */ + public RegionLocations remove(int replicaId) { + if (getRegionLocation(replicaId) == null) { + return this; + } + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + + System.arraycopy(locations, 0, newLocations, 0, locations.length); + if (replicaId < newLocations.length) { + newLocations[replicaId] = null; + } + + return new RegionLocations(newLocations); } /** - * Merges this HRegionLocation list with the given list assuming + * Merges this RegionLocations list with the given list assuming * same range, and keeping the most up to date version of the * HRegionLocation entries from either list according to seqNum. If seqNums * are equal, the location from the argument (other) is taken. * @param other the locations to merge with - * @return an HRegionLocationList object with merged locations or the same object + * @return an RegionLocations object with merged locations or the same object * if nothing is merged */ public RegionLocations mergeLocations(RegionLocations other) { @@ -160,7 +196,9 @@ public class RegionLocations { HRegionLocation[] newLocations = null; - int max = Math.max(this.locations.length, other.locations.length); + // Use the length from other, since it is coming from meta. Otherwise, + // in case of region replication going down, we might have a leak here. + int max = other.locations.length; for (int i = 0; i < max; i++) { HRegionLocation thisLoc = this.getRegionLocation(i); @@ -207,7 +245,7 @@ public class RegionLocations { * @param checkForEquals whether to update the location if seqNums for the * HRegionLocations for the old and new location are the same * @param force whether to force update - * @return an HRegionLocationList object with updated locations or the same object + * @return an RegionLocations object with updated locations or the same object * if nothing is updated */ public RegionLocations updateLocation(HRegionLocation location, @@ -282,12 +320,10 @@ public class RegionLocations { public String toString() { StringBuilder builder = new StringBuilder("["); for (HRegionLocation loc : locations) { - if (loc != null) { - if (builder.length() > 1) { - builder.append(", "); - } - builder.append(loc); + if (builder.length() > 1) { + builder.append(", "); } + builder.append(loc == null ? "null" : loc); } builder.append("]"); return builder.toString(); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java index e510857..ca5ae39 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java @@ -238,7 +238,7 @@ public class MetaReader { parsedInfo = parseRegionInfoFromRegionName(regionName); row = getMetaKeyForRegion(parsedInfo); } catch (Exception parseEx) { - LOG.warn("Received parse exception:" + parseEx); + // Ignore. This is used with tableName passed as regionName. } Get get = new Get(row); get.addFamily(HConstants.CATALOG_FAMILY); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index fb63473..ef9a120 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -107,6 +107,19 @@ interface ClusterConnection extends HConnection { final byte [] row) throws IOException; /** + * Find the location of the region of tableName that row + * lives in, ignoring any value that might be in the cache. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @param replicaId the replicaId of the region + * @return HRegionLocation that describes where to find the region in + * question + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException; + + /** * Update the location cache. This is used internally by HBase, in most cases it should not be * used by the client application. * @param tableName the table name @@ -165,6 +178,20 @@ interface ClusterConnection extends HConnection { */ RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry) throws IOException; + + /** + * + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @param replicaId the replicaId for the region + * @return region locations for this row. + * @throws IOException + */ + RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException; + /** * Returns a {@link MasterKeepAliveConnection} to the active master */ @@ -247,4 +274,5 @@ interface ClusterConnection extends HConnection { * @return All locations for a particular region. */ RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException; + } \ No newline at end of file diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index bea5fa8..30555ff 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -294,6 +294,18 @@ class ConnectionAdapter implements ClusterConnection { } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry, int replicaId) throws IOException { + return wrappedConnection.locateRegion(tableName, row, useCache, retry, replicaId); + } + + @Override + public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId) + throws IOException { + return wrappedConnection.relocateRegion(tableName, row, replicaId); + } + + @Override public MasterService.BlockingInterface getMaster() throws IOException { return wrappedConnection.getMaster(); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 98f9d65..2d97ebf 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -75,7 +75,89 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -968,6 +1050,12 @@ class ConnectionManager { @Override public HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException{ + return relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException{ // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when // the first time a disabled table is interacted with. @@ -975,8 +1063,8 @@ class ConnectionManager { throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); } - RegionLocations locations = locateRegion(tableName, row, false, true); - return locations == null ? null : locations.getRegionLocation(); + RegionLocations locations = locateRegion(tableName, row, false, true, replicaId); + return locations == null ? null : locations.getRegionLocation(replicaId); } @Override @@ -985,11 +1073,17 @@ class ConnectionManager { return relocateRegion(TableName.valueOf(tableName), row); } - @Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { + return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public RegionLocations locateRegion(final TableName tableName, + final byte [] row, boolean useCache, boolean retry, int replicaId) + throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( @@ -1000,7 +1094,7 @@ class ConnectionManager { return this.registry.getMetaRegionLocation(); } else { // Region not in the cache - have to go to the meta RS - return locateRegionInMeta(tableName, row, useCache, retry); + return locateRegionInMeta(tableName, row, useCache, retry, replicaId); } } @@ -1009,13 +1103,13 @@ class ConnectionManager { * info that contains the table and row we're seeking. */ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, - boolean useCache, boolean retry) throws IOException { + boolean useCache, boolean retry, int replicaId) throws IOException { // If we are supposed to be using the cache, look in the cache to see if // we already have the region. if (useCache) { RegionLocations locations = getCachedLocation(tableName, row); - if (locations != null) { + if (locations != null && locations.getRegionLocation(replicaId) != null) { return locations; } } @@ -1042,9 +1136,13 @@ class ConnectionManager { } if (useCache) { RegionLocations locations = getCachedLocation(tableName, row); - if (locations != null) { + if (locations != null && locations.getRegionLocation(replicaId) != null) { return locations; } + } else { + // If we are not supposed to be using the cache, delete any existing cached location + // so it won't interfere. + metaCache.clearCache(tableName, row); } // Query the meta region @@ -1066,11 +1164,11 @@ class ConnectionManager { // convert the row result into the HRegionLocation we need! RegionLocations locations = MetaReader.getRegionLocations(regionInfoRow); - if (locations == null || locations.getRegionLocation() == null) { + if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + tableName + ", row=" + regionInfoRow); } - HRegionInfo regionInfo = locations.getRegionLocation().getRegionInfo(); + HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + TableName.META_TABLE_NAME + ", row=" + regionInfoRow); @@ -1094,7 +1192,7 @@ class ConnectionManager { regionInfo.getRegionNameAsString()); } - ServerName serverName = locations.getRegionLocation().getServerName(); + ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + TableName.META_TABLE_NAME + " for region " + @@ -1107,11 +1205,9 @@ class ConnectionManager { regionInfo.getRegionNameAsString()+" is managed by the server " + serverName + ", but it is dead."); } - // Instantiate the location cacheLocation(tableName, locations); return locations; - } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming @@ -1137,7 +1233,7 @@ class ConnectionManager { // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { - relocateRegion(TableName.META_TABLE_NAME, metaKey); + relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId); } } try{ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 10a48ae..a7314a0 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -107,6 +107,9 @@ public class MetaCache { RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocations == null); if (isNewCacheEntry) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cached location: " + location); + } addToCachedServers(locations); return; } @@ -124,7 +127,10 @@ public class MetaCache { // an additional counter on top of seqNum would be necessary to handle them all. RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force); if (oldLocations != updatedLocations) { - tableLocations.replace(startKey, oldLocations, updatedLocations); + boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations); + if (replaced && LOG.isTraceEnabled()) { + LOG.trace("Changed cached location to: " + location); + } addToCachedServers(updatedLocations); } } @@ -132,24 +138,30 @@ public class MetaCache { /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. - * @param location the new location + * @param locations the new locations */ - public void cacheLocation(final TableName tableName, final RegionLocations location) { - byte [] startKey = location.getRegionLocation().getRegionInfo().getStartKey(); + public void cacheLocation(final TableName tableName, final RegionLocations locations) { + byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, location); + RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocation == null); if (isNewCacheEntry) { - addToCachedServers(location); + if (LOG.isTraceEnabled()) { + LOG.trace("Cached location: " + locations); + } + addToCachedServers(locations); return; } // merge old and new locations and add it to the cache // Meta record might be stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. - RegionLocations mergedLocation = oldLocation.mergeLocations(location); - tableLocations.replace(startKey, oldLocation, mergedLocation); - addToCachedServers(location); + RegionLocations mergedLocation = oldLocation.mergeLocations(locations); + boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation); + if (replaced && LOG.isTraceEnabled()) { + LOG.trace("Merged cached locations: " + mergedLocation); + } + addToCachedServers(locations); } private void addToCachedServers(RegionLocations locations) { @@ -238,12 +250,11 @@ public class MetaCache { RegionLocations regionLocations = e.getValue(); if (regionLocations != null) { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); - deletedSomething |= regionLocations == updatedLocations; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(e.getKey(), regionLocations); + deletedSomething |= tableLocations.remove(e.getKey(), regionLocations); } else { - tableLocations.replace(e.getKey(), regionLocations, updatedLocations); + deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations); } } } @@ -251,8 +262,8 @@ public class MetaCache { } this.cachedServers.remove(serverName); } - if (deletedSomething && LOG.isDebugEnabled()) { - LOG.debug("Removed all cached region locations that map to " + serverName); + if (deletedSomething && LOG.isTraceEnabled()) { + LOG.trace("Removed all cached region locations that map to " + serverName); } } @@ -260,6 +271,9 @@ public class MetaCache { * Delete all cached entries of a table. */ public void clearCache(final TableName tableName) { + if (LOG.isTraceEnabled()) { + LOG.trace("Removed all cached region locations for table " + tableName); + } this.cachedRegionLocations.remove(tableName); } @@ -268,6 +282,34 @@ public class MetaCache { * @param tableName tableName * @param row */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + boolean removed = false; + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + RegionLocations updatedLocations = regionLocations.remove(replicaId); + if (updatedLocations != regionLocations) { + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + } + + if (removed && LOG.isTraceEnabled() && toBeRemoved != null) { + LOG.trace("Removed " + toBeRemoved + " from cache"); + } + } + } + + /** + * Delete a cached location, no matter what it is. Called when we were told to not use cache. + * @param tableName tableName + * @param row + */ public void clearCache(final TableName tableName, final byte [] row) { ConcurrentMap tableLocations = getTableLocations(tableName); @@ -275,8 +317,8 @@ public class MetaCache { if (regionLocations != null) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); boolean removed = tableLocations.remove(startKey, regionLocations); - if (removed && LOG.isDebugEnabled()) { - LOG.debug("Removed " + regionLocations + " from cache"); + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + regionLocations + " from cache"); } } } @@ -292,10 +334,15 @@ public class MetaCache { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); if (updatedLocations != regionLocations) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed = false; if (updatedLocations.isEmpty()) { - tableLocations.remove(startKey, regionLocations); + removed = tableLocations.remove(startKey, regionLocations); } else { - tableLocations.replace(startKey, regionLocations, updatedLocations); + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) + + " mapping to server: " + serverName + " from cache"); } } } @@ -310,12 +357,17 @@ public class MetaCache { RegionLocations regionLocations = tableLocations.get(hri.getStartKey()); if (regionLocations != null) { HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId()); + if (oldLocation == null) return; RegionLocations updatedLocations = regionLocations.remove(oldLocation); + boolean removed = false; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(hri.getStartKey(), regionLocations); + removed = tableLocations.remove(hri.getStartKey(), regionLocations); } else { - tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + oldLocation + " from cache"); } } } @@ -325,22 +377,22 @@ public class MetaCache { if (location == null) { return; } - TableName tableName = location.getRegionInfo().getTable(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations rll = tableLocations.get(location.getRegionInfo().getStartKey()); - if (rll == null) { - return; - } - RegionLocations updatedLocations = rll.remove(location); - if (updatedLocations.isEmpty()) { - tableLocations.remove(location.getRegionInfo().getStartKey(), rll); - } - if (LOG.isDebugEnabled() && (rll == updatedLocations)) { - LOG.debug("Removed " + - location.getRegionInfo().getRegionNameAsString() + - " for tableName=" + tableName + - " from cache"); + RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey()); + if (regionLocations != null) { + RegionLocations updatedLocations = regionLocations.remove(location); + boolean removed = false; + if (updatedLocations != regionLocations) { + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations); + } else { + removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + location + " from cache"); + } + } } } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 6afbe01..ba2417d 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -21,6 +21,18 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,17 +52,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.protobuf.ServiceException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - /** * Caller that goes to replica if the primary region does no answer within a configurable * timeout. If the timeout is reached, it calls all the secondary replicas, and returns @@ -113,11 +114,11 @@ public class RpcRetryingCallerWithReadReplicas { } if (reload || location == null) { - RegionLocations rl = getRegionLocations(false); + RegionLocations rl = getRegionLocations(false, id); location = id < rl.size() ? rl.getRegionLocation(id) : null; } - if (location == null) { + if (location == null || location.getServerName() == null) { // With this exception, there will be a retry. The location can be null for a replica // when the table is created or after a split. throw new HBaseIOException("There is no location for replica id #" + id); @@ -188,30 +189,61 @@ public class RpcRetryingCallerWithReadReplicas { */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true); + RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID); BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size()); - addCallsForReplica(cs, rl, 0, 0); // primary. - + List exceptions = null; + int submitted = 0, completed = 0; + // submit call for the primary replica. + submitted += addCallsForReplica(cs, rl, 0, 0); try { + // wait for the timeout to see whether the primary responds back Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f == null) { - addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries - f = cs.take(); + if (f != null) { + return f.get(); //great we got a response } - return f.get(); } catch (ExecutionException e) { - throwEnrichedException(e); - return null; // unreachable + // the primary call failed with RetriesExhaustedException or DoNotRetryIOException + // but the secondaries might still succeed. Continue on the replica RPCs. + exceptions = new ArrayList(rl.size()); + exceptions.add(e); + completed++; + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + + // submit call for the all of the secondaries at once + // TODO: this may be an overkill for large region replication + submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1); + try { + while (completed < submitted) { + try { + Future f = cs.take(); + return f.get(); // great we got an answer + } catch (ExecutionException e) { + // if not cancel or interrupt, wait until all RPC's are done + // one of the tasks failed. Save the exception for later. + if (exceptions == null) exceptions = new ArrayList(rl.size()); + exceptions.add(e); + completed++; + } + } } catch (CancellationException e) { throw new InterruptedIOException(); } catch (InterruptedException e) { throw new InterruptedIOException(); } finally { // We get there because we were interrupted or because one or more of the - // calls succeeded or failed. In all case, we stop all our tasks. + // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(true); } + + if (exceptions != null && !exceptions.isEmpty()) { + throwEnrichedException(exceptions.get(0)); // just rethrow the first exception for now. + } + return null; // unreachable } /** @@ -248,8 +280,9 @@ public class RpcRetryingCallerWithReadReplicas { * @param rl - the region locations * @param min - the id of the first replica, inclusive * @param max - the id of the last replica, inclusive. + * @return the number of submitted calls */ - private void addCallsForReplica(BoundedCompletionService cs, + private int addCallsForReplica(BoundedCompletionService cs, RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); @@ -257,21 +290,22 @@ public class RpcRetryingCallerWithReadReplicas { RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica); cs.submit(retryingOnReplica); } + return max - min + 1; } - private RegionLocations getRegionLocations(boolean useCache) - throws RetriesExhaustedException, DoNotRetryIOException { + private RegionLocations getRegionLocations(boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { RegionLocations rl; try { - rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true); + rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; } catch (IOException e) { - if (e instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException) e; - } else if (e instanceof RetriesExhaustedException) { - throw (RetriesExhaustedException) e; - } else { - throw new RetriesExhaustedException("Can't get the location", e); - } + throw new RetriesExhaustedException("Can't get the location", e); } if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); @@ -279,4 +313,4 @@ public class RpcRetryingCallerWithReadReplicas { return rl; } -} \ No newline at end of file +} diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java index 603f8d5..c9257d7 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java @@ -46,25 +46,25 @@ public class TestRegionLocations { list = hrll((HRegionLocation)null); assertTrue(list.isEmpty()); - assertEquals(0, list.size()); + assertEquals(1, list.size()); assertEquals(0, list.numNonNullElements()); HRegionInfo info0 = hri(0); list = hrll(hrl(info0, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(1, list.size()); - assertEquals(1, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); HRegionInfo info9 = hri(9); list = hrll(hrl(info9, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(10, list.size()); - assertEquals(1, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); list = hrll(hrl(info0, null), hrl(info9, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(10, list.size()); - assertEquals(2, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); } private HRegionInfo hri(int replicaId) { @@ -100,7 +100,7 @@ public class TestRegionLocations { list = hrll(hrl(info0, sn0)); assertTrue(list == list.removeByServer(sn1)); list = list.removeByServer(sn0); - assertTrue(list.isEmpty()); + assertEquals(0, list.numNonNullElements()); // test remove from multi element list list = hrll(hrl(info0, sn0), hrl(info1, sn1), hrl(info2, sn2), hrl(info9, sn2)); @@ -226,7 +226,7 @@ public class TestRegionLocations { list1 = list2.mergeLocations(list1); assertEquals(sn0, list1.getRegionLocation(0).getServerName()); assertEquals(sn1, list1.getRegionLocation(1).getServerName()); - assertEquals(sn2, list1.getRegionLocation(2).getServerName()); + assertEquals(2, list1.size()); // the size is taken from the argument list to merge // do the other way merge as well list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); @@ -240,10 +240,9 @@ public class TestRegionLocations { list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); list2 = hrll(hrl(info0, sn2), hrl(info1, sn2), hrl(info9, sn3)); list1 = list2.mergeLocations(list1); // list1 should override - assertEquals(10, list1.size()); + assertEquals(2, list1.size()); assertEquals(sn0, list1.getRegionLocation(0).getServerName()); assertEquals(sn1, list1.getRegionLocation(1).getServerName()); - assertEquals(sn3, list1.getRegionLocation(9).getServerName()); // do the other way list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); @@ -272,4 +271,35 @@ public class TestRegionLocations { assertEquals(sn2, list1.getRegionLocation(1).getServerName()); assertEquals(sn3, list1.getRegionLocation(9).getServerName()); } + + @Test + public void testConstructWithNullElements() { + // RegionLocations can contain null elements as well. These null elements can + + RegionLocations list = new RegionLocations((HRegionLocation)null); + assertTrue(list.isEmpty()); + assertEquals(1, list.size()); + assertEquals(0, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info1, sn0)); + assertFalse(list.isEmpty()); + assertEquals(2, list.size()); + assertEquals(1, list.numNonNullElements()); + + list = new RegionLocations(hrl(info0, sn0), null); + assertEquals(2, list.size()); + assertEquals(1, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0)); + assertEquals(10, list.size()); + assertEquals(2, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null); + assertEquals(11, list.size()); + assertEquals(2, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null, null); + assertEquals(12, list.size()); + assertEquals(2, list.numNonNullElements()); + } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java index 514505b..d89d337 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java @@ -34,11 +34,12 @@ import java.util.concurrent.TimeUnit; * A completion service, close to the one available in the JDK 1.7 * However, this ones keeps the list of the future, and allows to cancel them all. * This means as well that it can be used for a small set of tasks only. + *
Implementation is not Thread safe. */ public class BoundedCompletionService { private final Executor executor; - private final List> sent; // alls the call we sent - private final BlockingQueue> completed; // all the results we got so far. + private final List> tasks; // alls the tasks + private final BlockingQueue> completed; // all the tasks that are completed class QueueingFuture extends FutureTask { @@ -46,6 +47,7 @@ public class BoundedCompletionService { super(callable); } + @Override protected void done() { completed.add(QueueingFuture.this); } @@ -53,7 +55,7 @@ public class BoundedCompletionService { public BoundedCompletionService(Executor executor, int maxTasks) { this.executor = executor; - this.sent = new ArrayList>(maxTasks); + this.tasks = new ArrayList>(maxTasks); this.completed = new ArrayBlockingQueue>(maxTasks); } @@ -61,7 +63,7 @@ public class BoundedCompletionService { public Future submit(Callable task) { QueueingFuture newFuture = new QueueingFuture(task); executor.execute(newFuture); - sent.add(newFuture); + tasks.add(newFuture); return newFuture; } @@ -74,7 +76,7 @@ public class BoundedCompletionService { } public void cancelAll(boolean interrupt) { - for (Future future : sent) { + for (Future future : tasks) { future.cancel(interrupt); } } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 0b06a4e..18747c9 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * Thread Utility @@ -38,6 +39,16 @@ import org.apache.hadoop.util.ReflectionUtils; public class Threads { protected static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); + + private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Thread:" + t + " exited with Exception:" + + StringUtils.stringifyException(e)); + } + }; + /** * Utility method that sets name, daemon status and starts passed thread. * @param t thread to run @@ -160,15 +171,15 @@ public class Threads { } /** - * Create a new CachedThreadPool with a bounded number as the maximum + * Create a new CachedThreadPool with a bounded number as the maximum * thread size in the pool. - * + * * @param maxCachedThread the maximum thread could be created in the pool * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @param threadFactory the factory to use when creating new threads - * @return threadPoolExecutor the cachedThreadPool with a bounded number - * as the maximum thread size in the pool. + * @return threadPoolExecutor the cachedThreadPool with a bounded number + * as the maximum thread size in the pool. */ public static ThreadPoolExecutor getBoundedCachedThreadPool( int maxCachedThread, long timeout, TimeUnit unit, @@ -180,8 +191,8 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - + + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. @@ -230,6 +241,8 @@ public class Threads { Thread t = namedFactory.newThread(r); if (handler != null) { t.setUncaughtExceptionHandler(handler); + } else { + t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); } if (!t.isDaemon()) { t.setDaemon(true); @@ -242,4 +255,11 @@ public class Threads { }; } + + /** Sets an UncaughtExceptionHandler for the thread which logs the + * Exception stack if the thread dies. + */ + public static void setLoggingUncaughtExceptionHandler(Thread t) { + t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index 646c86d..aef9f4e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -244,6 +244,12 @@ class CoprocessorHConnection implements ClusterConnection { } @Override + public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId) + throws IOException { + return delegate.relocateRegion(tableName, row, replicaId); + } + + @Override public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException { return delegate.relocateRegion(tableName, row); } @@ -294,6 +300,12 @@ class CoprocessorHConnection implements ClusterConnection { } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry, int replicaId) throws IOException { + return delegate.locateRegion(tableName, row, useCache, retry, replicaId); + } + + @Override public List locateRegions(byte[] tableName, boolean useCache, boolean offlined) throws IOException { return delegate.locateRegions(tableName, useCache, offlined); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index b9fe633..bf7a93b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -325,7 +325,7 @@ public class TestReplicaWithCluster { RegionServerCallable callable = new RegionServerCallable( conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) { @Override - public Void call() throws Exception { + public Void call(int timeout) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); byte[] regionName = getLocation().getRegionInfo().getRegionName(); @@ -337,7 +337,7 @@ public class TestReplicaWithCluster { }; RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); RpcRetryingCaller caller = factory. newCaller(); - caller.callWithRetries(callable); + caller.callWithRetries(callable, 10000); // verify we can read them from the primary LOG.debug("Verifying data load"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index cc87800..5bafd09 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -120,7 +120,9 @@ public class MultiThreadedReader extends MultiThreadedAction } protected HBaseReaderThread createReaderThread(int readerId) throws IOException { - return new HBaseReaderThread(readerId); + HBaseReaderThread reader = new HBaseReaderThread(readerId); + Threads.setLoggingUncaughtExceptionHandler(reader); + return reader; } public class HBaseReaderThread extends Thread { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index bfe3ebd..80e0d52 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -73,6 +73,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { protected void createWriterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); + Threads.setLoggingUncaughtExceptionHandler(writer); writers.add(writer); } } @@ -89,6 +90,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { return new HTable(conf, tableName); } + @Override public void run() { try { long rowKeyBase; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java index 9373e6f..340f5f0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -101,8 +101,8 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { if (cached != null) { result = "cached: " + cached.toString(); } - if (real != null) { - if (real.equals(cached)) { + if (real != null && real.getServerName() != null) { + if (cached != null && cached.getServerName() != null && real.equals(cached)) { result += "; cache is up to date"; } else { result = (cached != null) ? (result + "; ") : ""; -- 2.0.0