Index: hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy) @@ -561,7 +561,9 @@ break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if (offset == -1) { + throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName)); + } byte[] tableName = new byte[offset]; System.arraycopy(regionName, 0, tableName, 0, offset); offset = -1; @@ -590,7 +592,9 @@ break; } } - if(offset == -1) throw new IOException("Invalid regionName format"); + if (offset == -1) { + throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName)); + } byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; if(offset != tableName.length + 1) { startKey = new byte[offset - tableName.length - 1]; Index: hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java (working copy) @@ -21,7 +21,6 @@ import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.util.Bytes; /** @@ -34,27 +33,42 @@ public class RegionLocations { private final int numNonNullElements; + + // locations array contains the HRL objects for known region replicas indexes by the replicaId. + // elements can be null if the region replica is not known at all. A null value indicates + // that there is a region replica with the index as replicaId, but the location is not known + // in the cache. private final HRegionLocation[] locations; // replicaId -> HRegionLocation. /** * Constructs the region location list. The locations array should * contain all the locations for known replicas for the region, and should be - * sorted in replicaId ascending order. + * sorted in replicaId ascending order, although it can contain nulls indicating replicaIds + * that the locations of which are not known. * @param locations an array of HRegionLocations for the same region range */ public RegionLocations(HRegionLocation... locations) { int numNonNullElements = 0; int maxReplicaId = -1; + int maxReplicaIdIndex = -1; + int index = 0; for (HRegionLocation loc : locations) { if (loc != null) { - numNonNullElements++; - if (loc.getRegionInfo().getReplicaId() > maxReplicaId) { + if (loc.getServerName() != null) { + numNonNullElements++; + } + if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) { maxReplicaId = loc.getRegionInfo().getReplicaId(); + maxReplicaIdIndex = index; } } + index++; } this.numNonNullElements = numNonNullElements; + // account for the null elements in the array after maxReplicaIdIndex + maxReplicaId = maxReplicaId + (locations.length - (maxReplicaIdIndex + 1) ); + if (maxReplicaId + 1 == locations.length) { this.locations = locations; } else { @@ -97,10 +111,10 @@ } /** - * Returns a new HRegionLocationList with the locations removed (set to null) + * Returns a new RegionLocations with the locations removed (set to null) * which have the destination server as given. * @param serverName the serverName to remove locations of - * @return an HRegionLocationList object with removed locations or the same object + * @return an RegionLocations object with removed locations or the same object * if nothing is removed */ public RegionLocations removeByServer(ServerName serverName) { @@ -123,36 +137,58 @@ /** * Removes the given location from the list * @param location the location to remove - * @return an HRegionLocationList object with removed locations or the same object + * @return an RegionLocations object with removed locations or the same object * if nothing is removed */ public RegionLocations remove(HRegionLocation location) { - HRegionLocation[] newLocations = null; - for (int i = 0; i < locations.length; i++) { - // check whether something to remove. HRL.compareTo() compares ONLY the - // serverName. We want to compare the HRI's as well. - if (locations[i] != null - && location.getRegionInfo().equals(locations[i].getRegionInfo()) - && location.equals(locations[i])) { - if (newLocations == null) { //first time - newLocations = new HRegionLocation[locations.length]; - System.arraycopy(locations, 0, newLocations, 0, i); - } - newLocations[i] = null; - } else if (newLocations != null) { - newLocations[i] = locations[i]; - } + if (location == null) return this; + if (location.getRegionInfo() == null) return this; + int replicaId = location.getRegionInfo().getReplicaId(); + if (replicaId >= locations.length) return this; + + // check whether something to remove. HRL.compareTo() compares ONLY the + // serverName. We want to compare the HRI's as well. + if (locations[replicaId] == null + || !location.getRegionInfo().equals(locations[replicaId].getRegionInfo()) + || !location.equals(locations[replicaId])) { + return this; } - return newLocations == null ? this : new RegionLocations(newLocations); + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + System.arraycopy(locations, 0, newLocations, 0, locations.length); + newLocations[replicaId] = null; + + return new RegionLocations(newLocations); } /** - * Merges this HRegionLocation list with the given list assuming + * Removes location of the given replicaId from the list + * @param replicaId the replicaId of the location to remove + * @return an RegionLocations object with removed locations or the same object + * if nothing is removed + */ + public RegionLocations remove(int replicaId) { + if (getRegionLocation(replicaId) == null) { + return this; + } + + HRegionLocation[] newLocations = new HRegionLocation[locations.length]; + + System.arraycopy(locations, 0, newLocations, 0, locations.length); + if (replicaId < newLocations.length) { + newLocations[replicaId] = null; + } + + return new RegionLocations(newLocations); + } + + /** + * Merges this RegionLocations list with the given list assuming * same range, and keeping the most up to date version of the * HRegionLocation entries from either list according to seqNum. If seqNums * are equal, the location from the argument (other) is taken. * @param other the locations to merge with - * @return an HRegionLocationList object with merged locations or the same object + * @return an RegionLocations object with merged locations or the same object * if nothing is merged */ public RegionLocations mergeLocations(RegionLocations other) { @@ -160,7 +196,9 @@ HRegionLocation[] newLocations = null; - int max = Math.max(this.locations.length, other.locations.length); + // Use the length from other, since it is coming from meta. Otherwise, + // in case of region replication going down, we might have a leak here. + int max = other.locations.length; for (int i = 0; i < max; i++) { HRegionLocation thisLoc = this.getRegionLocation(i); @@ -207,7 +245,7 @@ * @param checkForEquals whether to update the location if seqNums for the * HRegionLocations for the old and new location are the same * @param force whether to force update - * @return an HRegionLocationList object with updated locations or the same object + * @return an RegionLocations object with updated locations or the same object * if nothing is updated */ public RegionLocations updateLocation(HRegionLocation location, @@ -282,12 +320,10 @@ public String toString() { StringBuilder builder = new StringBuilder("["); for (HRegionLocation loc : locations) { - if (loc != null) { - if (builder.length() > 1) { - builder.append(", "); - } - builder.append(loc); + if (builder.length() > 1) { + builder.append(", "); } + builder.append(loc == null ? "null" : loc); } builder.append("]"); return builder.toString(); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (working copy) @@ -237,7 +237,7 @@ parsedInfo = parseRegionInfoFromRegionName(regionName); row = getMetaKeyForRegion(parsedInfo); } catch (Exception parseEx) { - LOG.warn("Received parse exception:" + parseEx); + // Ignore. This is used with tableName passed as regionName. } Get get = new Get(row); get.addFamily(HConstants.CATALOG_FAMILY); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (working copy) @@ -38,6 +38,7 @@ interface ClusterConnection extends HConnection { /** @return - true if the master server is running */ + @Override boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException; @@ -53,9 +54,10 @@ * @throws IOException * if a remote or network exception occurs */ + @Override boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException; - + /** * Find the location of the region of tableName that row * lives in. @@ -65,6 +67,7 @@ * question * @throws IOException if a remote or network exception occurs */ + @Override public HRegionLocation locateRegion(final TableName tableName, final byte [] row) throws IOException; @@ -71,6 +74,7 @@ /** * Allows flushing the region cache. */ + @Override void clearRegionCache(); /** @@ -79,6 +83,7 @@ * @param tableName Name of the table whose regions we are to remove from * cache. */ + @Override void clearRegionCache(final TableName tableName); /** @@ -85,6 +90,7 @@ * Deletes cached locations for the specific region. * @param location The location object for the region, to be purged from cache. */ + @Override void deleteCachedRegionLocation(final HRegionLocation location); /** @@ -96,10 +102,24 @@ * question * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException; /** + * Find the location of the region of tableName that row + * lives in, ignoring any value that might be in the cache. + * @param tableName name of the table row is in + * @param row row key you're trying to find the region of + * @param replicaId the replicaId of the region + * @return HRegionLocation that describes where to find the region in + * question + * @throws IOException if a remote or network exception occurs + */ + HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException; + + /** * Update the location cache. This is used internally by HBase, in most cases it should not be * used by the client application. * @param tableName the table name @@ -119,6 +139,7 @@ * question * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation locateRegion(final byte[] regionName) throws IOException; @@ -128,6 +149,7 @@ * @return list of region locations for all regions of table * @throws IOException */ + @Override List locateRegions(final TableName tableName) throws IOException; /** @@ -139,6 +161,7 @@ * @return list of region locations for all regions of table * @throws IOException */ + @Override List locateRegions(final TableName tableName, final boolean useCache, final boolean offlined) throws IOException; @@ -154,9 +177,24 @@ */ RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry) throws IOException; + /** + * + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @param replicaId the replicaId for the region + * @return region locations for this row. + * @throws IOException + */ + RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException; + + /** * Returns a {@link MasterKeepAliveConnection} to the active master */ + @Override MasterService.BlockingInterface getMaster() throws IOException; @@ -166,6 +204,7 @@ * @return proxy for HRegionServer * @throws IOException if a remote or network exception occurs */ + @Override AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException; /** @@ -177,6 +216,7 @@ * @throws IOException if a remote or network exception occurs * */ + @Override ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException; /** @@ -187,6 +227,7 @@ * @return Location of row. * @throws IOException if a remote or network exception occurs */ + @Override HRegionLocation getRegionLocation(TableName tableName, byte [] row, boolean reload) throws IOException; @@ -195,6 +236,7 @@ * Clear any caches that pertain to server name sn. * @param sn A server name */ + @Override void clearCaches(final ServerName sn); /** @@ -203,6 +245,7 @@ * @return The shared instance. Never returns null. * @throws MasterNotRunningException */ + @Override @Deprecated MasterKeepAliveConnection getKeepAliveMasterService() throws MasterNotRunningException; @@ -211,6 +254,7 @@ * @param serverName * @return true if the server is known as dead, false otherwise. * @deprecated internal method, do not use thru HConnection */ + @Override @Deprecated boolean isDeadServer(ServerName serverName); @@ -217,6 +261,7 @@ /** * @return Nonce generator for this HConnection; may be null if disabled in configuration. */ + @Override public NonceGenerator getNonceGenerator(); /** @@ -228,4 +273,5 @@ * @return All locations for a particular region. */ RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException; + } \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (working copy) @@ -35,9 +35,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -48,7 +45,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -79,7 +75,87 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -354,6 +430,7 @@ * @param conf configuration whose identity is used to find {@link HConnection} instance. * @deprecated */ + @Deprecated public static void deleteConnection(Configuration conf) { deleteConnection(new HConnectionKey(conf), false); } @@ -365,6 +442,7 @@ * @param connection * @deprecated */ + @Deprecated public static void deleteStaleConnection(HConnection connection) { deleteConnection(connection, true); } @@ -375,6 +453,7 @@ * staleConnection to true. * @deprecated */ + @Deprecated public static void deleteAllConnections(boolean staleConnection) { synchronized (CONNECTION_INSTANCES) { Set connectionKeys = new HashSet(); @@ -1003,6 +1082,12 @@ @Override public HRegionLocation relocateRegion(final TableName tableName, final byte [] row) throws IOException{ + return relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public HRegionLocation relocateRegion(final TableName tableName, + final byte [] row, int replicaId) throws IOException{ // Since this is an explicit request not to use any caching, finding // disabled tables should not be desirable. This will ensure that an exception is thrown when // the first time a disabled table is interacted with. @@ -1010,8 +1095,8 @@ throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); } - RegionLocations locations = locateRegion(tableName, row, false, true); - return locations == null ? null : locations.getRegionLocation(); + RegionLocations locations = locateRegion(tableName, row, false, true, replicaId); + return locations == null ? null : locations.getRegionLocation(replicaId); } @Override @@ -1020,11 +1105,17 @@ return relocateRegion(TableName.valueOf(tableName), row); } - @Override public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { + return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + @Override + public RegionLocations locateRegion(final TableName tableName, + final byte [] row, boolean useCache, boolean retry, int replicaId) + throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( @@ -1036,7 +1127,7 @@ } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row, - useCache, userRegionLock, retry); + useCache, userRegionLock, retry, replicaId); } } @@ -1100,15 +1191,15 @@ */ private RegionLocations locateRegionInMeta(final TableName parentTable, final TableName tableName, final byte [] row, boolean useCache, - Object regionLockObject, boolean retry) + Object regionLockObject, boolean retry, int replicaId) throws IOException { - RegionLocations location; + RegionLocations locations; // If we are supposed to be using the cache, look in the cache to see if // we already have the region. if (useCache) { - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } } int localNumRetries = retry ? numTries : 1; @@ -1127,7 +1218,7 @@ try { // locate the meta region RegionLocations metaLocations = locateRegion(parentTable, metaKey, true, false); - metaLocation = metaLocations == null ? null : metaLocations.getRegionLocation(); + metaLocation = metaLocations == null ? null : metaLocations.getDefaultRegionLocation(); // If null still, go around again. if (metaLocation == null) continue; ClientService.BlockingInterface service = getClient(metaLocation.getServerName()); @@ -1142,9 +1233,9 @@ synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } // If the parent table is META, we may want to pre-fetch some // region info into the global region cache for this table. @@ -1151,14 +1242,14 @@ prefetchRegionCache(tableName, row); } } - location = getCachedLocation(tableName, row); - if (location != null) { - return location; + locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. - metaCache.clearCache(tableName, row); + metaCache.clearCache(tableName, row, replicaId); } // Query the meta region for the location of the meta region @@ -1171,12 +1262,12 @@ } // convert the row result into the HRegionLocation we need! - location = MetaReader.getRegionLocations(regionInfoRow); - if (location == null || location.getRegionLocation() == null) { + locations = MetaReader.getRegionLocations(regionInfoRow); + if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + parentTable + ", row=" + regionInfoRow); } - HRegionInfo regionInfo = location.getRegionLocation().getRegionInfo(); + HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo(); if (regionInfo == null) { throw new IOException("HRegionInfo was null or empty in " + parentTable + ", row=" + regionInfoRow); @@ -1200,7 +1291,7 @@ regionInfo.getRegionNameAsString()); } - ServerName serverName = location.getRegionLocation().getServerName(); + ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); if (serverName == null) { throw new NoServerForRegionException("No server address listed " + "in " + parentTable + " for region " + @@ -1214,8 +1305,8 @@ ", but it is dead."); } - cacheLocation(tableName, location); - return location; + cacheLocation(tableName, locations); + return locations; } catch (TableNotFoundException e) { // if we got this error, probably means the table just plain doesn't // exist. rethrow the error immediately. this should always be coming @@ -1242,7 +1333,7 @@ // Only relocate the parent region if necessary if(!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) { - relocateRegion(parentTable, metaKey); + relocateRegion(parentTable, metaKey, replicaId); } } try{ Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java (working copy) @@ -114,6 +114,9 @@ RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocations == null); if (isNewCacheEntry) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cached location: " + location); + } addToCachedServers(locations); return; } @@ -131,7 +134,10 @@ // an additional counter on top of seqNum would be necessary to handle them all. RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force); if (oldLocations != updatedLocations) { - tableLocations.replace(startKey, oldLocations, updatedLocations); + boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations); + if (replaced && LOG.isTraceEnabled()) { + LOG.trace("Changed cached location to: " + location); + } addToCachedServers(updatedLocations); } } @@ -139,15 +145,18 @@ /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. - * @param location the new location + * @param locations the new locations */ - public void cacheLocation(final TableName tableName, final RegionLocations location) { - byte [] startKey = location.getRegionLocation().getRegionInfo().getStartKey(); + public void cacheLocation(final TableName tableName, final RegionLocations locations) { + byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, location); + RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocation == null); if (isNewCacheEntry) { - addToCachedServers(location); + if (LOG.isTraceEnabled()) { + LOG.trace("Cached location: " + locations); + } + addToCachedServers(locations); return; } @@ -154,9 +163,12 @@ // merge old and new locations and add it to the cache // Meta record might be stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. - RegionLocations mergedLocation = oldLocation.mergeLocations(location); - tableLocations.replace(startKey, oldLocation, mergedLocation); - addToCachedServers(location); + RegionLocations mergedLocation = oldLocation.mergeLocations(locations); + boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation); + if (replaced && LOG.isTraceEnabled()) { + LOG.trace("Merged cached locations: " + mergedLocation); + } + addToCachedServers(locations); } private void addToCachedServers(RegionLocations locations) { @@ -245,12 +257,11 @@ RegionLocations regionLocations = e.getValue(); if (regionLocations != null) { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); - deletedSomething |= regionLocations == updatedLocations; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(e.getKey(), regionLocations); + deletedSomething |= tableLocations.remove(e.getKey(), regionLocations); } else { - tableLocations.replace(e.getKey(), regionLocations, updatedLocations); + deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations); } } } @@ -258,8 +269,8 @@ } this.cachedServers.remove(serverName); } - if (deletedSomething && LOG.isDebugEnabled()) { - LOG.debug("Removed all cached region locations that map to " + serverName); + if (deletedSomething && LOG.isTraceEnabled()) { + LOG.trace("Removed all cached region locations that map to " + serverName); } } @@ -267,6 +278,9 @@ * Delete all cached entries of a table. */ public void clearCache(final TableName tableName) { + if (LOG.isTraceEnabled()) { + LOG.trace("Removed all cached region locations for table " + tableName); + } this.cachedRegionLocations.remove(tableName); } @@ -275,6 +289,34 @@ * @param tableName tableName * @param row */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + boolean removed = false; + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + RegionLocations updatedLocations = regionLocations.remove(replicaId); + if (updatedLocations != regionLocations) { + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + } + + if (removed && LOG.isTraceEnabled() && toBeRemoved != null) { + LOG.trace("Removed " + toBeRemoved + " from cache"); + } + } + } + + /** + * Delete a cached location, no matter what it is. Called when we were told to not use cache. + * @param tableName tableName + * @param row + */ public void clearCache(final TableName tableName, final byte [] row) { ConcurrentMap tableLocations = getTableLocations(tableName); @@ -282,8 +324,8 @@ if (regionLocations != null) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); boolean removed = tableLocations.remove(startKey, regionLocations); - if (removed && LOG.isDebugEnabled()) { - LOG.debug("Removed " + regionLocations + " from cache"); + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + regionLocations + " from cache"); } } } @@ -299,11 +341,16 @@ RegionLocations updatedLocations = regionLocations.removeByServer(serverName); if (updatedLocations != regionLocations) { byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed = false; if (updatedLocations.isEmpty()) { - tableLocations.remove(startKey, regionLocations); + removed = tableLocations.remove(startKey, regionLocations); } else { - tableLocations.replace(startKey, regionLocations, updatedLocations); + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) + + " mapping to server: " + serverName + " from cache"); + } } } } @@ -317,13 +364,18 @@ RegionLocations regionLocations = tableLocations.get(hri.getStartKey()); if (regionLocations != null) { HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId()); + if (oldLocation == null) return; RegionLocations updatedLocations = regionLocations.remove(oldLocation); + boolean removed = false; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - tableLocations.remove(hri.getStartKey(), regionLocations); + removed = tableLocations.remove(hri.getStartKey(), regionLocations); } else { - tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + oldLocation + " from cache"); + } } } } @@ -332,23 +384,23 @@ if (location == null) { return; } - TableName tableName = location.getRegionInfo().getTable(); ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations rll = tableLocations.get(location.getRegionInfo().getStartKey()); - if (rll == null) { - return; + RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey()); + if (regionLocations != null) { + RegionLocations updatedLocations = regionLocations.remove(location); + boolean removed = false; + if (updatedLocations != regionLocations) { + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations); + } else { + removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations); + } + if (removed && LOG.isTraceEnabled()) { + LOG.trace("Removed " + location + " from cache"); + } + } } - RegionLocations updatedLocations = rll.remove(location); - if (updatedLocations.isEmpty()) { - tableLocations.remove(location.getRegionInfo().getStartKey(), rll); - } - if (LOG.isDebugEnabled() && (rll == updatedLocations)) { - LOG.debug("Removed " + - location.getRegionInfo().getRegionNameAsString() + - " for tableName=" + tableName + - " from cache"); - } } public void setRegionCachePrefetch(final TableName tableName, final boolean enable) { Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (revision 1585756) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (working copy) @@ -21,6 +21,18 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,17 +46,6 @@ import org.apache.hadoop.hbase.util.BoundedCompletionService; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - /** * Caller that goes to replica if the primary region does no answer within a configurable * timeout. If the timeout is reached, it calls all the secondary replicas, and returns @@ -104,11 +105,11 @@ } if (reload || location == null) { - RegionLocations rl = getRegionLocations(false); + RegionLocations rl = getRegionLocations(false, id); location = id < rl.size() ? rl.getRegionLocation(id) : null; } - if (location == null) { + if (location == null || location.getServerName() == null) { // With this exception, there will be a retry. The location can be null for a replica // when the table is created or after a split. throw new HBaseIOException("There is no location for replica id #" + id); @@ -170,30 +171,61 @@ */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true); + RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID); BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size()); - addCallsForReplica(cs, rl, 0, 0); // primary. - + List exceptions = null; + int submitted = 0, completed = 0; + // submit call for the primary replica. + submitted += addCallsForReplica(cs, rl, 0, 0); try { + // wait for the timeout to see whether the primary responds back Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f == null) { - addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries - f = cs.take(); + if (f != null) { + return f.get(); //great we got a response } - return f.get(); } catch (ExecutionException e) { - throwEnrichedException(e); - return null; // unreachable + // the primary call failed with RetriesExhaustedException or DoNotRetryIOException + // but the secondaries might still succeed. Continue on the replica RPCs. + exceptions = new ArrayList(rl.size()); + exceptions.add(e); + completed++; } catch (CancellationException e) { throw new InterruptedIOException(); } catch (InterruptedException e) { throw new InterruptedIOException(); + } + + // submit call for the all of the secondaries at once + // TODO: this may be an overkill for large region replication + submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1); + try { + while (completed < submitted) { + try { + Future f = cs.take(); + return f.get(); // great we got an answer + } catch (ExecutionException e) { + // if not cancel or interrupt, wait until all RPC's are done + // one of the tasks failed. Save the exception for later. + if (exceptions == null) exceptions = new ArrayList(rl.size()); + exceptions.add(e); + completed++; + } + } + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); } finally { // We get there because we were interrupted or because one or more of the - // calls succeeded or failed. In all case, we stop all our tasks. + // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(true); } + + if (exceptions != null && !exceptions.isEmpty()) { + throwEnrichedException(exceptions.get(0)); // just rethrow the first exception for now. + } + return null; // unreachable } /** @@ -230,8 +262,9 @@ * @param rl - the region locations * @param min - the id of the first replica, inclusive * @param max - the id of the last replica, inclusive. + * @return the number of submitted calls */ - private void addCallsForReplica(BoundedCompletionService cs, + private int addCallsForReplica(BoundedCompletionService cs, RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); @@ -239,21 +272,22 @@ RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica); cs.submit(retryingOnReplica); } + return max - min + 1; } - private RegionLocations getRegionLocations(boolean useCache) - throws RetriesExhaustedException, DoNotRetryIOException { + private RegionLocations getRegionLocations(boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { RegionLocations rl; try { - rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true); + rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; } catch (IOException e) { - if (e instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException) e; - } else if (e instanceof RetriesExhaustedException) { - throw (RetriesExhaustedException) e; - } else { - throw new RetriesExhaustedException("Can't get the location", e); - } + throw new RetriesExhaustedException("Can't get the location", e); } if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); @@ -261,4 +295,4 @@ return rl; } -} \ No newline at end of file +} Index: hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java =================================================================== --- hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java (revision 1585756) +++ hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java (working copy) @@ -46,25 +46,25 @@ list = hrll((HRegionLocation)null); assertTrue(list.isEmpty()); - assertEquals(0, list.size()); + assertEquals(1, list.size()); assertEquals(0, list.numNonNullElements()); HRegionInfo info0 = hri(0); list = hrll(hrl(info0, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(1, list.size()); - assertEquals(1, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); HRegionInfo info9 = hri(9); list = hrll(hrl(info9, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(10, list.size()); - assertEquals(1, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); list = hrll(hrl(info0, null), hrl(info9, null)); - assertFalse(list.isEmpty()); + assertTrue(list.isEmpty()); assertEquals(10, list.size()); - assertEquals(2, list.numNonNullElements()); + assertEquals(0, list.numNonNullElements()); } private HRegionInfo hri(int replicaId) { @@ -100,7 +100,7 @@ list = hrll(hrl(info0, sn0)); assertTrue(list == list.removeByServer(sn1)); list = list.removeByServer(sn0); - assertTrue(list.isEmpty()); + assertEquals(0, list.numNonNullElements()); // test remove from multi element list list = hrll(hrl(info0, sn0), hrl(info1, sn1), hrl(info2, sn2), hrl(info9, sn2)); @@ -226,7 +226,7 @@ list1 = list2.mergeLocations(list1); assertEquals(sn0, list1.getRegionLocation(0).getServerName()); assertEquals(sn1, list1.getRegionLocation(1).getServerName()); - assertEquals(sn2, list1.getRegionLocation(2).getServerName()); + assertEquals(2, list1.size()); // the size is taken from the argument list to merge // do the other way merge as well list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); @@ -240,10 +240,9 @@ list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); list2 = hrll(hrl(info0, sn2), hrl(info1, sn2), hrl(info9, sn3)); list1 = list2.mergeLocations(list1); // list1 should override - assertEquals(10, list1.size()); + assertEquals(2, list1.size()); assertEquals(sn0, list1.getRegionLocation(0).getServerName()); assertEquals(sn1, list1.getRegionLocation(1).getServerName()); - assertEquals(sn3, list1.getRegionLocation(9).getServerName()); // do the other way list1 = hrll(hrl(info0, sn0), hrl(info1, sn1)); @@ -272,4 +271,35 @@ assertEquals(sn2, list1.getRegionLocation(1).getServerName()); assertEquals(sn3, list1.getRegionLocation(9).getServerName()); } + + @Test + public void testConstructWithNullElements() { + // RegionLocations can contain null elements as well. These null elements can + + RegionLocations list = new RegionLocations((HRegionLocation)null); + assertTrue(list.isEmpty()); + assertEquals(1, list.size()); + assertEquals(0, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info1, sn0)); + assertFalse(list.isEmpty()); + assertEquals(2, list.size()); + assertEquals(1, list.numNonNullElements()); + + list = new RegionLocations(hrl(info0, sn0), null); + assertEquals(2, list.size()); + assertEquals(1, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0)); + assertEquals(10, list.size()); + assertEquals(2, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null); + assertEquals(11, list.size()); + assertEquals(2, list.numNonNullElements()); + + list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null, null); + assertEquals(12, list.size()); + assertEquals(2, list.numNonNullElements()); + } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (revision 1585756) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (working copy) @@ -34,11 +34,12 @@ * A completion service, close to the one available in the JDK 1.7 * However, this ones keeps the list of the future, and allows to cancel them all. * This means as well that it can be used for a small set of tasks only. + *
Implementation is not Thread safe. */ public class BoundedCompletionService { private final Executor executor; - private final List> sent; // alls the call we sent - private final BlockingQueue> completed; // all the results we got so far. + private final List> tasks; // alls the tasks + private final BlockingQueue> completed; // all the tasks that are completed class QueueingFuture extends FutureTask { @@ -46,6 +47,7 @@ super(callable); } + @Override protected void done() { completed.add(QueueingFuture.this); } @@ -53,7 +55,7 @@ public BoundedCompletionService(Executor executor, int maxTasks) { this.executor = executor; - this.sent = new ArrayList>(maxTasks); + this.tasks = new ArrayList>(maxTasks); this.completed = new ArrayBlockingQueue>(maxTasks); } @@ -61,7 +63,7 @@ public Future submit(Callable task) { QueueingFuture newFuture = new QueueingFuture(task); executor.execute(newFuture); - sent.add(newFuture); + tasks.add(newFuture); return newFuture; } @@ -74,7 +76,7 @@ } public void cancelAll(boolean interrupt) { - for (Future future : sent) { + for (Future future : tasks) { future.cancel(interrupt); } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1585756) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * Thread Utility @@ -38,6 +39,16 @@ public class Threads { protected static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); + + private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Thread:" + t + " exited with Exception:" + + StringUtils.stringifyException(e)); + } + }; + /** * Utility method that sets name, daemon status and starts passed thread. * @param t thread to run @@ -160,15 +171,15 @@ } /** - * Create a new CachedThreadPool with a bounded number as the maximum + * Create a new CachedThreadPool with a bounded number as the maximum * thread size in the pool. - * + * * @param maxCachedThread the maximum thread could be created in the pool * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @param threadFactory the factory to use when creating new threads - * @return threadPoolExecutor the cachedThreadPool with a bounded number - * as the maximum thread size in the pool. + * @return threadPoolExecutor the cachedThreadPool with a bounded number + * as the maximum thread size in the pool. */ public static ThreadPoolExecutor getBoundedCachedThreadPool( int maxCachedThread, long timeout, TimeUnit unit, @@ -180,8 +191,8 @@ boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - + + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. @@ -230,6 +241,8 @@ Thread t = namedFactory.newThread(r); if (handler != null) { t.setUncaughtExceptionHandler(handler); + } else { + t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); } if (!t.isDaemon()) { t.setDaemon(true); @@ -242,4 +255,11 @@ }; } + + /** Sets an UncaughtExceptionHandler for the thread which logs the + * Exception stack if the thread dies. + */ + public static void setLoggingUncaughtExceptionHandler(Thread t) { + t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (revision 1585756) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (working copy) @@ -274,6 +274,12 @@ } @Override + public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId) + throws IOException { + return delegate.relocateRegion(tableName, row, replicaId); + } + + @Override public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException { return delegate.relocateRegion(tableName, row); } @@ -331,6 +337,12 @@ } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry, int replicaId) throws IOException { + return delegate.locateRegion(tableName, row, useCache, retry, replicaId); + } + + @Override public List locateRegions(byte[] tableName, boolean useCache, boolean offlined) throws IOException { return delegate.locateRegions(tableName, useCache, offlined); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1585756) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -637,25 +637,29 @@ status.setStatus("Writing region info on filesystem"); fs.checkRegionInfoOnFilesystem(); - // Remove temporary data left over from old regions - status.setStatus("Cleaning up temporary data from old regions"); - fs.cleanupTempDir(); - // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeRegionStores(reporter, status); - status.setStatus("Cleaning up detritus from prior splits"); - // Get rid of any splits or merges that were lost in-progress. Clean out - // these directories here on open. We may be opening a region that was - // being split but we crashed in the middle of it all. - fs.cleanupAnySplitDetritus(); - fs.cleanupMergesDir(); - this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.flushRequested = false; this.writestate.compacting = 0; + if (this.writestate.writesEnabled) { + // Remove temporary data left over from old regions + status.setStatus("Cleaning up temporary data from old regions"); + fs.cleanupTempDir(); + } + + if (this.writestate.writesEnabled) { + status.setStatus("Cleaning up detritus from prior splits"); + // Get rid of any splits or merges that were lost in-progress. Clean out + // these directories here on open. We may be opening a region that was + // being split but we crashed in the middle of it all. + fs.cleanupAnySplitDetritus(); + fs.cleanupMergesDir(); + } + // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); @@ -753,9 +757,12 @@ } } mvcc.initialize(maxMemstoreTS + 1); - // Recover any edits if available. - maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( - this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + + if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { + // Recover any edits if available. + maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( + this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + } return maxSeqId; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (revision 1585756) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (working copy) @@ -192,8 +192,9 @@ ArrayList storeFiles = new ArrayList(files.length); for (FileStatus status: files) { if (!StoreFileInfo.isValid(status)) continue; - - storeFiles.add(new StoreFileInfo(this.conf, this.fs, status)); + StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, + regionInfoForFs, familyName, status); + storeFiles.add(info); } return storeFiles; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1585756) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -497,7 +497,7 @@ completionService.submit(new Callable() { @Override public StoreFile call() throws IOException { - StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath()); + StoreFile storeFile = createStoreFileAndReader(storeFileInfo); return storeFile; } }); @@ -592,6 +592,10 @@ private StoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); + return createStoreFileAndReader(info); + } + + private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (revision 1585756) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.FSUtils; @@ -133,6 +133,22 @@ } /** + * Create a Store File Info from an HFileLink + * @param conf the {@link Configuration} to use + * @param fs The current file system to use. + * @param fileStatus The {@link FileStatus} of the file + */ + public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus, + final HFileLink link) + throws IOException { + this.conf = conf; + this.fileStatus = fileStatus; + // HFileLink + this.reference = null; + this.link = link; + } + + /** * Sets the region coprocessor env. * @param coprocessorHost */ @@ -195,6 +211,8 @@ long length = status.getLen(); if (this.reference != null) { hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status); + } else if (this.link != null) { + hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs); } else { hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length); } @@ -226,8 +244,18 @@ FileStatus status; if (this.reference != null) { if (this.link != null) { - // HFileLink Reference - status = link.getFileStatus(fs); + FileNotFoundException exToThrow = null; + for (int i = 0; i < this.link.getLocations().length; i++) { + // HFileLink Reference + try { + status = link.getFileStatus(fs); + return computeRefFileHDFSBlockDistribution(fs, reference, status); + } catch (FileNotFoundException ex) { + // try the other location + exToThrow = ex; + } + } + throw exToThrow; } else { // HFile Reference Path referencePath = getReferredToFile(this.getPath()); @@ -236,8 +264,18 @@ return computeRefFileHDFSBlockDistribution(fs, reference, status); } else { if (this.link != null) { - // HFileLink - status = link.getFileStatus(fs); + FileNotFoundException exToThrow = null; + for (int i = 0; i < this.link.getLocations().length; i++) { + // HFileLink + try { + status = link.getFileStatus(fs); + return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + } catch (FileNotFoundException ex) { + // try the other location + exToThrow = ex; + } + } + throw exToThrow; } else { status = this.fileStatus; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java (revision 1585756) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java (working copy) @@ -18,9 +18,16 @@ package org.apache.hadoop.hbase.util; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; /** * Similar to {@link RegionReplicaUtil} but for the server side @@ -48,5 +55,39 @@ || !isDefaultReplica(region.getRegionInfo()); } + /** + * Returns whether to replay the recovered edits to flush the results. + * Currently secondary region replicas do not replay the edits, since it would + * cause flushes which might affect the primary region. Primary regions even opened + * in read only mode should replay the edits. + * @param region the HRegion object + * @return whether recovered edits should be replayed. + */ + public static boolean shouldReplayRecoveredEdits(HRegion region) { + return isDefaultReplica(region.getRegionInfo()); + } + /** + * Returns a StoreFileInfo from the given FileStatus. Secondary replicas refer to the + * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This + * way ensures that the secondary will be able to continue reading the store files even if + * they are moved to archive after compaction + * @throws IOException + */ + public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs, + HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status) + throws IOException { + + // if this is a primary region, just return the StoreFileInfo constructed from path + if (regionInfo.equals(regionInfoForFs)) { + return new StoreFileInfo(conf, fs, status); + } + + // else create a store file link. The link file does not exists on filesystem though. + HFileLink link = new HFileLink(conf, + HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName() + , familyName, status.getPath().getName())); + return new StoreFileInfo(conf, fs, status, link); + } + } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -3928,6 +3928,7 @@ // create a primary region, load some data and flush // create a secondary region, and do a get against that Path rootDir = new Path(DIR + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -3977,6 +3978,7 @@ // create a primary region, load some data and flush // create a secondary region, and do a put against that Path rootDir = new Path(DIR + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -4024,7 +4026,60 @@ HRegion.closeHRegion(secondaryRegion); } } + } + @Test + public void testCompactionFromPrimary() throws IOException { + Path rootDir = new Path(DIR + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); + + byte[][] families = new byte[][] { + Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") + }; + byte[] cq = Bytes.toBytes("cq"); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary")); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + + long time = System.currentTimeMillis(); + HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 0); + HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 1); + + HRegion primaryRegion = null, secondaryRegion = null; + + try { + primaryRegion = HRegion.createHRegion(primaryHri, + rootDir, TEST_UTIL.getConfiguration(), htd); + + // load some data + putData(primaryRegion, 0, 1000, cq, families); + + // flush region + primaryRegion.flushcache(); + + // open secondary region + secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, conf); + + // move the file of the primary region to the archive, simulating a compaction + Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); + primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); + Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); + + verifyData(secondaryRegion, 0, 1000, cq, families); + } finally { + if (primaryRegion != null) { + HRegion.closeHRegion(primaryRegion); + } + if (secondaryRegion != null) { + HRegion.closeHRegion(secondaryRegion); + } + } } private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java (working copy) @@ -19,6 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -29,6 +35,7 @@ import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -37,6 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -43,6 +51,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; import com.google.protobuf.ServiceException; @@ -296,4 +305,126 @@ closeRegion(hriSecondary); } } + + @Test(timeout = 300000) + public void testFlushAndCompactionsInPrimary() throws Exception { + + long runtime = 30 * 1000; + // enable store file refreshing + final int refreshPeriod = 100; // 100ms refresh is a lot + HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); + HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); + // restart the region server so that it starts the refresher chore + restartRegionServer(); + final int startKey = 0, endKey = 1000; + + try { + openRegion(hriSecondary); + + //load some data to primary so that reader won't fail + HTU.loadNumericRows(table, f, startKey, endKey); + HTU.getHBaseAdmin().flush(table.getTableName()); + // ensure that chore is run + Threads.sleep(2 * refreshPeriod); + + final AtomicBoolean running = new AtomicBoolean(true); + @SuppressWarnings("unchecked") + final AtomicReference[] exceptions = new AtomicReference[3]; + for (int i=0; i < exceptions.length; i++) { + exceptions[i] = new AtomicReference(); + } + + Runnable writer = new Runnable() { + int key = startKey; + @Override + public void run() { + try { + while (running.get()) { + byte[] data = Bytes.toBytes(String.valueOf(key)); + Put put = new Put(data); + put.add(f, null, data); + table.put(put); + key++; + if (key == endKey) key = startKey; + } + } catch (Exception ex) { + Log.warn(ex); + exceptions[0].compareAndSet(null, ex); + } + } + }; + + Runnable flusherCompactor = new Runnable() { + Random random = new Random(); + @Override + public void run() { + try { + while (running.get()) { + // flush or compact + if (random.nextBoolean()) { + HTU.getHBaseAdmin().flush(table.getTableName()); + } else { + HTU.compact(table.getName(), random.nextBoolean()); + } + } + } catch (Exception ex) { + Log.warn(ex); + exceptions[1].compareAndSet(null, ex); + } + } + }; + + Runnable reader = new Runnable() { + Random random = new Random(); + @Override + public void run() { + try { + while (running.get()) { + // whether to do a close and open + if (random.nextInt(10) == 0) { + try { + closeRegion(hriSecondary); + } catch (Exception ex) { + Log.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); + exceptions[2].compareAndSet(null, ex); + } + try { + openRegion(hriSecondary); + } catch (Exception ex) { + Log.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); + exceptions[2].compareAndSet(null, ex); + } + } + + int key = random.nextInt(endKey - startKey) + startKey; + assertGetRpc(hriSecondary, key, true); + } + } catch (Exception ex) { + Log.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); + exceptions[2].compareAndSet(null, ex); + } + } + }; + + Log.info("Starting writer and reader"); + ExecutorService executor = Executors.newFixedThreadPool(3); + executor.submit(writer); + executor.submit(flusherCompactor); + executor.submit(reader); + + // wait for threads + Threads.sleep(runtime); + running.set(false); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + for (AtomicReference exRef : exceptions) { + Assert.assertNull(exRef.get()); + } + + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); + closeRegion(hriSecondary); + } + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.Progressable; import org.junit.experimental.categories.Category; +import org.junit.Test; import org.mockito.Mockito; import com.google.common.collect.Lists; @@ -917,6 +918,7 @@ store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); } + @Test public void testRefreshStoreFiles() throws Exception { init(this.getName()); @@ -963,6 +965,7 @@ } @SuppressWarnings("unchecked") + @Test public void testRefreshStoreFilesNotChanged() throws IOException { init(this.getName()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java (working copy) @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; import org.junit.Assert; import org.junit.Before; @@ -62,6 +63,7 @@ public void setUp() { TEST_UTIL = new HBaseTestingUtility(); testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); } private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) { @@ -92,7 +94,7 @@ private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); - Path tableDir = new Path(testDir, htd.getTableName().getNameAsString()); + Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (working copy) @@ -120,7 +120,9 @@ } protected HBaseReaderThread createReaderThread(int readerId) throws IOException { - return new HBaseReaderThread(readerId); + HBaseReaderThread reader = new HBaseReaderThread(readerId); + Threads.setLoggingUncaughtExceptionHandler(reader); + return reader; } public class HBaseReaderThread extends Thread { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (working copy) @@ -73,6 +73,7 @@ protected void createWriterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); + Threads.setLoggingUncaughtExceptionHandler(writer); writers.add(writer); } } @@ -89,6 +90,7 @@ return new HTable(conf, tableName); } + @Override public void run() { try { long rowKeyBase; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (revision 1585756) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (working copy) @@ -101,8 +101,8 @@ if (cached != null) { result = "cached: " + cached.toString(); } - if (real != null) { - if (real.equals(cached)) { + if (real != null && real.getServerName() != null) { + if (cached != null && cached.getServerName() != null && real.equals(cached)) { result += "; cache is up to date"; } else { result = (cached != null) ? (result + "; ") : "";