Index: hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy)
@@ -561,7 +561,9 @@
break;
}
}
- if(offset == -1) throw new IOException("Invalid regionName format");
+ if (offset == -1) {
+ throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName));
+ }
byte[] tableName = new byte[offset];
System.arraycopy(regionName, 0, tableName, 0, offset);
offset = -1;
@@ -590,7 +592,9 @@
break;
}
}
- if(offset == -1) throw new IOException("Invalid regionName format");
+ if (offset == -1) {
+ throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName));
+ }
byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
if(offset != tableName.length + 1) {
startKey = new byte[offset - tableName.length - 1];
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java (working copy)
@@ -21,7 +21,6 @@
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -34,27 +33,42 @@
public class RegionLocations {
private final int numNonNullElements;
+
+ // locations array contains the HRL objects for known region replicas indexes by the replicaId.
+ // elements can be null if the region replica is not known at all. A null value indicates
+ // that there is a region replica with the index as replicaId, but the location is not known
+ // in the cache.
private final HRegionLocation[] locations; // replicaId -> HRegionLocation.
/**
* Constructs the region location list. The locations array should
* contain all the locations for known replicas for the region, and should be
- * sorted in replicaId ascending order.
+ * sorted in replicaId ascending order, although it can contain nulls indicating replicaIds
+ * that the locations of which are not known.
* @param locations an array of HRegionLocations for the same region range
*/
public RegionLocations(HRegionLocation... locations) {
int numNonNullElements = 0;
int maxReplicaId = -1;
+ int maxReplicaIdIndex = -1;
+ int index = 0;
for (HRegionLocation loc : locations) {
if (loc != null) {
- numNonNullElements++;
- if (loc.getRegionInfo().getReplicaId() > maxReplicaId) {
+ if (loc.getServerName() != null) {
+ numNonNullElements++;
+ }
+ if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
maxReplicaId = loc.getRegionInfo().getReplicaId();
+ maxReplicaIdIndex = index;
}
}
+ index++;
}
this.numNonNullElements = numNonNullElements;
+ // account for the null elements in the array after maxReplicaIdIndex
+ maxReplicaId = maxReplicaId + (locations.length - (maxReplicaIdIndex + 1) );
+
if (maxReplicaId + 1 == locations.length) {
this.locations = locations;
} else {
@@ -97,10 +111,10 @@
}
/**
- * Returns a new HRegionLocationList with the locations removed (set to null)
+ * Returns a new RegionLocations with the locations removed (set to null)
* which have the destination server as given.
* @param serverName the serverName to remove locations of
- * @return an HRegionLocationList object with removed locations or the same object
+ * @return an RegionLocations object with removed locations or the same object
* if nothing is removed
*/
public RegionLocations removeByServer(ServerName serverName) {
@@ -123,36 +137,58 @@
/**
* Removes the given location from the list
* @param location the location to remove
- * @return an HRegionLocationList object with removed locations or the same object
+ * @return an RegionLocations object with removed locations or the same object
* if nothing is removed
*/
public RegionLocations remove(HRegionLocation location) {
- HRegionLocation[] newLocations = null;
- for (int i = 0; i < locations.length; i++) {
- // check whether something to remove. HRL.compareTo() compares ONLY the
- // serverName. We want to compare the HRI's as well.
- if (locations[i] != null
- && location.getRegionInfo().equals(locations[i].getRegionInfo())
- && location.equals(locations[i])) {
- if (newLocations == null) { //first time
- newLocations = new HRegionLocation[locations.length];
- System.arraycopy(locations, 0, newLocations, 0, i);
- }
- newLocations[i] = null;
- } else if (newLocations != null) {
- newLocations[i] = locations[i];
- }
+ if (location == null) return this;
+ if (location.getRegionInfo() == null) return this;
+ int replicaId = location.getRegionInfo().getReplicaId();
+ if (replicaId >= locations.length) return this;
+
+ // check whether something to remove. HRL.compareTo() compares ONLY the
+ // serverName. We want to compare the HRI's as well.
+ if (locations[replicaId] == null
+ || !location.getRegionInfo().equals(locations[replicaId].getRegionInfo())
+ || !location.equals(locations[replicaId])) {
+ return this;
}
- return newLocations == null ? this : new RegionLocations(newLocations);
+
+ HRegionLocation[] newLocations = new HRegionLocation[locations.length];
+ System.arraycopy(locations, 0, newLocations, 0, locations.length);
+ newLocations[replicaId] = null;
+
+ return new RegionLocations(newLocations);
}
/**
- * Merges this HRegionLocation list with the given list assuming
+ * Removes location of the given replicaId from the list
+ * @param replicaId the replicaId of the location to remove
+ * @return an RegionLocations object with removed locations or the same object
+ * if nothing is removed
+ */
+ public RegionLocations remove(int replicaId) {
+ if (getRegionLocation(replicaId) == null) {
+ return this;
+ }
+
+ HRegionLocation[] newLocations = new HRegionLocation[locations.length];
+
+ System.arraycopy(locations, 0, newLocations, 0, locations.length);
+ if (replicaId < newLocations.length) {
+ newLocations[replicaId] = null;
+ }
+
+ return new RegionLocations(newLocations);
+ }
+
+ /**
+ * Merges this RegionLocations list with the given list assuming
* same range, and keeping the most up to date version of the
* HRegionLocation entries from either list according to seqNum. If seqNums
* are equal, the location from the argument (other) is taken.
* @param other the locations to merge with
- * @return an HRegionLocationList object with merged locations or the same object
+ * @return an RegionLocations object with merged locations or the same object
* if nothing is merged
*/
public RegionLocations mergeLocations(RegionLocations other) {
@@ -160,7 +196,9 @@
HRegionLocation[] newLocations = null;
- int max = Math.max(this.locations.length, other.locations.length);
+ // Use the length from other, since it is coming from meta. Otherwise,
+ // in case of region replication going down, we might have a leak here.
+ int max = other.locations.length;
for (int i = 0; i < max; i++) {
HRegionLocation thisLoc = this.getRegionLocation(i);
@@ -207,7 +245,7 @@
* @param checkForEquals whether to update the location if seqNums for the
* HRegionLocations for the old and new location are the same
* @param force whether to force update
- * @return an HRegionLocationList object with updated locations or the same object
+ * @return an RegionLocations object with updated locations or the same object
* if nothing is updated
*/
public RegionLocations updateLocation(HRegionLocation location,
@@ -282,12 +320,10 @@
public String toString() {
StringBuilder builder = new StringBuilder("[");
for (HRegionLocation loc : locations) {
- if (loc != null) {
- if (builder.length() > 1) {
- builder.append(", ");
- }
- builder.append(loc);
+ if (builder.length() > 1) {
+ builder.append(", ");
}
+ builder.append(loc == null ? "null" : loc);
}
builder.append("]");
return builder.toString();
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (working copy)
@@ -237,7 +237,7 @@
parsedInfo = parseRegionInfoFromRegionName(regionName);
row = getMetaKeyForRegion(parsedInfo);
} catch (Exception parseEx) {
- LOG.warn("Received parse exception:" + parseEx);
+ // Ignore. This is used with tableName passed as regionName.
}
Get get = new Get(row);
get.addFamily(HConstants.CATALOG_FAMILY);
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (working copy)
@@ -38,6 +38,7 @@
interface ClusterConnection extends HConnection {
/** @return - true if the master server is running */
+ @Override
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
@@ -53,9 +54,10 @@
* @throws IOException
* if a remote or network exception occurs
*/
+ @Override
boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws
IOException;
-
+
/**
* Find the location of the region of tableName that row
* lives in.
@@ -65,6 +67,7 @@
* question
* @throws IOException if a remote or network exception occurs
*/
+ @Override
public HRegionLocation locateRegion(final TableName tableName,
final byte [] row) throws IOException;
@@ -71,6 +74,7 @@
/**
* Allows flushing the region cache.
*/
+ @Override
void clearRegionCache();
/**
@@ -79,6 +83,7 @@
* @param tableName Name of the table whose regions we are to remove from
* cache.
*/
+ @Override
void clearRegionCache(final TableName tableName);
/**
@@ -85,6 +90,7 @@
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
*/
+ @Override
void deleteCachedRegionLocation(final HRegionLocation location);
/**
@@ -96,10 +102,24 @@
* question
* @throws IOException if a remote or network exception occurs
*/
+ @Override
HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
+ * Find the location of the region of tableName that row
+ * lives in, ignoring any value that might be in the cache.
+ * @param tableName name of the table row is in
+ * @param row row key you're trying to find the region of
+ * @param replicaId the replicaId of the region
+ * @return HRegionLocation that describes where to find the region in
+ * question
+ * @throws IOException if a remote or network exception occurs
+ */
+ HRegionLocation relocateRegion(final TableName tableName,
+ final byte [] row, int replicaId) throws IOException;
+
+ /**
* Update the location cache. This is used internally by HBase, in most cases it should not be
* used by the client application.
* @param tableName the table name
@@ -119,6 +139,7 @@
* question
* @throws IOException if a remote or network exception occurs
*/
+ @Override
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
@@ -128,6 +149,7 @@
* @return list of region locations for all regions of table
* @throws IOException
*/
+ @Override
List locateRegions(final TableName tableName) throws IOException;
/**
@@ -139,6 +161,7 @@
* @return list of region locations for all regions of table
* @throws IOException
*/
+ @Override
List locateRegions(final TableName tableName,
final boolean useCache,
final boolean offlined) throws IOException;
@@ -154,9 +177,24 @@
*/
RegionLocations locateRegion(TableName tableName,
byte[] row, boolean useCache, boolean retry) throws IOException;
+
/**
+ *
+ * @param tableName table to get regions of
+ * @param row the row
+ * @param useCache Should we use the cache to retrieve the region information.
+ * @param retry do we retry
+ * @param replicaId the replicaId for the region
+ * @return region locations for this row.
+ * @throws IOException
+ */
+ RegionLocations locateRegion(TableName tableName,
+ byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException;
+
+ /**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
+ @Override
MasterService.BlockingInterface getMaster() throws IOException;
@@ -166,6 +204,7 @@
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
+ @Override
AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException;
/**
@@ -177,6 +216,7 @@
* @throws IOException if a remote or network exception occurs
*
*/
+ @Override
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
@@ -187,6 +227,7 @@
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
+ @Override
HRegionLocation getRegionLocation(TableName tableName, byte [] row,
boolean reload)
throws IOException;
@@ -195,6 +236,7 @@
* Clear any caches that pertain to server name sn.
* @param sn A server name
*/
+ @Override
void clearCaches(final ServerName sn);
/**
@@ -203,6 +245,7 @@
* @return The shared instance. Never returns null.
* @throws MasterNotRunningException
*/
+ @Override
@Deprecated
MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException;
@@ -211,6 +254,7 @@
* @param serverName
* @return true if the server is known as dead, false otherwise.
* @deprecated internal method, do not use thru HConnection */
+ @Override
@Deprecated
boolean isDeadServer(ServerName serverName);
@@ -217,6 +261,7 @@
/**
* @return Nonce generator for this HConnection; may be null if disabled in configuration.
*/
+ @Override
public NonceGenerator getNonceGenerator();
/**
@@ -228,4 +273,5 @@
* @return All locations for a particular region.
*/
RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
+
}
\ No newline at end of file
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (working copy)
@@ -35,9 +35,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -48,7 +45,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -79,7 +75,87 @@
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -354,6 +430,7 @@
* @param conf configuration whose identity is used to find {@link HConnection} instance.
* @deprecated
*/
+ @Deprecated
public static void deleteConnection(Configuration conf) {
deleteConnection(new HConnectionKey(conf), false);
}
@@ -365,6 +442,7 @@
* @param connection
* @deprecated
*/
+ @Deprecated
public static void deleteStaleConnection(HConnection connection) {
deleteConnection(connection, true);
}
@@ -375,6 +453,7 @@
* staleConnection to true.
* @deprecated
*/
+ @Deprecated
public static void deleteAllConnections(boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
Set connectionKeys = new HashSet();
@@ -1003,6 +1082,12 @@
@Override
public HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException{
+ return relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
+ }
+
+ @Override
+ public HRegionLocation relocateRegion(final TableName tableName,
+ final byte [] row, int replicaId) throws IOException{
// Since this is an explicit request not to use any caching, finding
// disabled tables should not be desirable. This will ensure that an exception is thrown when
// the first time a disabled table is interacted with.
@@ -1010,8 +1095,8 @@
throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
}
- RegionLocations locations = locateRegion(tableName, row, false, true);
- return locations == null ? null : locations.getRegionLocation();
+ RegionLocations locations = locateRegion(tableName, row, false, true, replicaId);
+ return locations == null ? null : locations.getRegionLocation(replicaId);
}
@Override
@@ -1020,11 +1105,17 @@
return relocateRegion(TableName.valueOf(tableName), row);
}
-
@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry)
throws IOException {
+ return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
+ }
+
+ @Override
+ public RegionLocations locateRegion(final TableName tableName,
+ final byte [] row, boolean useCache, boolean retry, int replicaId)
+ throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
if (tableName== null || tableName.getName().length == 0) {
throw new IllegalArgumentException(
@@ -1036,7 +1127,7 @@
} else {
// Region not in the cache - have to go to the meta RS
return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
- useCache, userRegionLock, retry);
+ useCache, userRegionLock, retry, replicaId);
}
}
@@ -1100,15 +1191,15 @@
*/
private RegionLocations locateRegionInMeta(final TableName parentTable,
final TableName tableName, final byte [] row, boolean useCache,
- Object regionLockObject, boolean retry)
+ Object regionLockObject, boolean retry, int replicaId)
throws IOException {
- RegionLocations location;
+ RegionLocations locations;
// If we are supposed to be using the cache, look in the cache to see if
// we already have the region.
if (useCache) {
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
+ locations = getCachedLocation(tableName, row);
+ if (locations != null && locations.getRegionLocation(replicaId) != null) {
+ return locations;
}
}
int localNumRetries = retry ? numTries : 1;
@@ -1127,7 +1218,7 @@
try {
// locate the meta region
RegionLocations metaLocations = locateRegion(parentTable, metaKey, true, false);
- metaLocation = metaLocations == null ? null : metaLocations.getRegionLocation();
+ metaLocation = metaLocations == null ? null : metaLocations.getDefaultRegionLocation();
// If null still, go around again.
if (metaLocation == null) continue;
ClientService.BlockingInterface service = getClient(metaLocation.getServerName());
@@ -1142,9 +1233,9 @@
synchronized (regionLockObject) {
// Check the cache again for a hit in case some other thread made the
// same query while we were waiting on the lock.
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
+ locations = getCachedLocation(tableName, row);
+ if (locations != null && locations.getRegionLocation(replicaId) != null) {
+ return locations;
}
// If the parent table is META, we may want to pre-fetch some
// region info into the global region cache for this table.
@@ -1151,14 +1242,14 @@
prefetchRegionCache(tableName, row);
}
}
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
+ locations = getCachedLocation(tableName, row);
+ if (locations != null && locations.getRegionLocation(replicaId) != null) {
+ return locations;
}
} else {
// If we are not supposed to be using the cache, delete any existing cached location
// so it won't interfere.
- metaCache.clearCache(tableName, row);
+ metaCache.clearCache(tableName, row, replicaId);
}
// Query the meta region for the location of the meta region
@@ -1171,12 +1262,12 @@
}
// convert the row result into the HRegionLocation we need!
- location = MetaReader.getRegionLocations(regionInfoRow);
- if (location == null || location.getRegionLocation() == null) {
+ locations = MetaReader.getRegionLocations(regionInfoRow);
+ if (locations == null || locations.getRegionLocation(replicaId) == null) {
throw new IOException("HRegionInfo was null in " +
parentTable + ", row=" + regionInfoRow);
}
- HRegionInfo regionInfo = location.getRegionLocation().getRegionInfo();
+ HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
if (regionInfo == null) {
throw new IOException("HRegionInfo was null or empty in " +
parentTable + ", row=" + regionInfoRow);
@@ -1200,7 +1291,7 @@
regionInfo.getRegionNameAsString());
}
- ServerName serverName = location.getRegionLocation().getServerName();
+ ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
if (serverName == null) {
throw new NoServerForRegionException("No server address listed " +
"in " + parentTable + " for region " +
@@ -1214,8 +1305,8 @@
", but it is dead.");
}
- cacheLocation(tableName, location);
- return location;
+ cacheLocation(tableName, locations);
+ return locations;
} catch (TableNotFoundException e) {
// if we got this error, probably means the table just plain doesn't
// exist. rethrow the error immediately. this should always be coming
@@ -1242,7 +1333,7 @@
// Only relocate the parent region if necessary
if(!(e instanceof RegionOfflineException ||
e instanceof NoServerForRegionException)) {
- relocateRegion(parentTable, metaKey);
+ relocateRegion(parentTable, metaKey, replicaId);
}
}
try{
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java (working copy)
@@ -114,6 +114,9 @@
RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
boolean isNewCacheEntry = (oldLocations == null);
if (isNewCacheEntry) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cached location: " + location);
+ }
addToCachedServers(locations);
return;
}
@@ -131,7 +134,10 @@
// an additional counter on top of seqNum would be necessary to handle them all.
RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
if (oldLocations != updatedLocations) {
- tableLocations.replace(startKey, oldLocations, updatedLocations);
+ boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
+ if (replaced && LOG.isTraceEnabled()) {
+ LOG.trace("Changed cached location to: " + location);
+ }
addToCachedServers(updatedLocations);
}
}
@@ -139,15 +145,18 @@
/**
* Put a newly discovered HRegionLocation into the cache.
* @param tableName The table name.
- * @param location the new location
+ * @param locations the new locations
*/
- public void cacheLocation(final TableName tableName, final RegionLocations location) {
- byte [] startKey = location.getRegionLocation().getRegionInfo().getStartKey();
+ public void cacheLocation(final TableName tableName, final RegionLocations locations) {
+ byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey();
ConcurrentMap tableLocations = getTableLocations(tableName);
- RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, location);
+ RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
boolean isNewCacheEntry = (oldLocation == null);
if (isNewCacheEntry) {
- addToCachedServers(location);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cached location: " + locations);
+ }
+ addToCachedServers(locations);
return;
}
@@ -154,9 +163,12 @@
// merge old and new locations and add it to the cache
// Meta record might be stale - some (probably the same) server has closed the region
// with later seqNum and told us about the new location.
- RegionLocations mergedLocation = oldLocation.mergeLocations(location);
- tableLocations.replace(startKey, oldLocation, mergedLocation);
- addToCachedServers(location);
+ RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
+ boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
+ if (replaced && LOG.isTraceEnabled()) {
+ LOG.trace("Merged cached locations: " + mergedLocation);
+ }
+ addToCachedServers(locations);
}
private void addToCachedServers(RegionLocations locations) {
@@ -245,12 +257,11 @@
RegionLocations regionLocations = e.getValue();
if (regionLocations != null) {
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
- deletedSomething |= regionLocations == updatedLocations;
if (updatedLocations != regionLocations) {
if (updatedLocations.isEmpty()) {
- tableLocations.remove(e.getKey(), regionLocations);
+ deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
} else {
- tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
+ deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
}
}
}
@@ -258,8 +269,8 @@
}
this.cachedServers.remove(serverName);
}
- if (deletedSomething && LOG.isDebugEnabled()) {
- LOG.debug("Removed all cached region locations that map to " + serverName);
+ if (deletedSomething && LOG.isTraceEnabled()) {
+ LOG.trace("Removed all cached region locations that map to " + serverName);
}
}
@@ -267,6 +278,9 @@
* Delete all cached entries of a table.
*/
public void clearCache(final TableName tableName) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Removed all cached region locations for table " + tableName);
+ }
this.cachedRegionLocations.remove(tableName);
}
@@ -275,6 +289,34 @@
* @param tableName tableName
* @param row
*/
+ public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
+ ConcurrentMap tableLocations = getTableLocations(tableName);
+
+ boolean removed = false;
+ RegionLocations regionLocations = getCachedLocation(tableName, row);
+ if (regionLocations != null) {
+ HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
+ RegionLocations updatedLocations = regionLocations.remove(replicaId);
+ if (updatedLocations != regionLocations) {
+ byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+ if (updatedLocations.isEmpty()) {
+ removed = tableLocations.remove(startKey, regionLocations);
+ } else {
+ removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
+ }
+ }
+
+ if (removed && LOG.isTraceEnabled() && toBeRemoved != null) {
+ LOG.trace("Removed " + toBeRemoved + " from cache");
+ }
+ }
+ }
+
+ /**
+ * Delete a cached location, no matter what it is. Called when we were told to not use cache.
+ * @param tableName tableName
+ * @param row
+ */
public void clearCache(final TableName tableName, final byte [] row) {
ConcurrentMap tableLocations = getTableLocations(tableName);
@@ -282,8 +324,8 @@
if (regionLocations != null) {
byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
boolean removed = tableLocations.remove(startKey, regionLocations);
- if (removed && LOG.isDebugEnabled()) {
- LOG.debug("Removed " + regionLocations + " from cache");
+ if (removed && LOG.isTraceEnabled()) {
+ LOG.trace("Removed " + regionLocations + " from cache");
}
}
}
@@ -299,11 +341,16 @@
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
if (updatedLocations != regionLocations) {
byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+ boolean removed = false;
if (updatedLocations.isEmpty()) {
- tableLocations.remove(startKey, regionLocations);
+ removed = tableLocations.remove(startKey, regionLocations);
} else {
- tableLocations.replace(startKey, regionLocations, updatedLocations);
+ removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
}
+ if (removed && LOG.isTraceEnabled()) {
+ LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
+ + " mapping to server: " + serverName + " from cache");
+ }
}
}
}
@@ -317,13 +364,18 @@
RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
if (regionLocations != null) {
HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
+ if (oldLocation == null) return;
RegionLocations updatedLocations = regionLocations.remove(oldLocation);
+ boolean removed = false;
if (updatedLocations != regionLocations) {
if (updatedLocations.isEmpty()) {
- tableLocations.remove(hri.getStartKey(), regionLocations);
+ removed = tableLocations.remove(hri.getStartKey(), regionLocations);
} else {
- tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
+ removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
}
+ if (removed && LOG.isTraceEnabled()) {
+ LOG.trace("Removed " + oldLocation + " from cache");
+ }
}
}
}
@@ -332,23 +384,23 @@
if (location == null) {
return;
}
-
TableName tableName = location.getRegionInfo().getTable();
ConcurrentMap tableLocations = getTableLocations(tableName);
- RegionLocations rll = tableLocations.get(location.getRegionInfo().getStartKey());
- if (rll == null) {
- return;
+ RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey());
+ if (regionLocations != null) {
+ RegionLocations updatedLocations = regionLocations.remove(location);
+ boolean removed = false;
+ if (updatedLocations != regionLocations) {
+ if (updatedLocations.isEmpty()) {
+ removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations);
+ } else {
+ removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations);
+ }
+ if (removed && LOG.isTraceEnabled()) {
+ LOG.trace("Removed " + location + " from cache");
+ }
+ }
}
- RegionLocations updatedLocations = rll.remove(location);
- if (updatedLocations.isEmpty()) {
- tableLocations.remove(location.getRegionInfo().getStartKey(), rll);
- }
- if (LOG.isDebugEnabled() && (rll == updatedLocations)) {
- LOG.debug("Removed " +
- location.getRegionInfo().getRegionNameAsString() +
- " for tableName=" + tableName +
- " from cache");
- }
}
public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (revision 1585756)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (working copy)
@@ -21,6 +21,18 @@
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,17 +46,6 @@
import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
/**
* Caller that goes to replica if the primary region does no answer within a configurable
* timeout. If the timeout is reached, it calls all the secondary replicas, and returns
@@ -104,11 +105,11 @@
}
if (reload || location == null) {
- RegionLocations rl = getRegionLocations(false);
+ RegionLocations rl = getRegionLocations(false, id);
location = id < rl.size() ? rl.getRegionLocation(id) : null;
}
- if (location == null) {
+ if (location == null || location.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
@@ -170,30 +171,61 @@
*/
public synchronized Result call()
throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
- RegionLocations rl = getRegionLocations(true);
+ RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size());
- addCallsForReplica(cs, rl, 0, 0); // primary.
-
+ List exceptions = null;
+ int submitted = 0, completed = 0;
+ // submit call for the primary replica.
+ submitted += addCallsForReplica(cs, rl, 0, 0);
try {
+ // wait for the timeout to see whether the primary responds back
Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
- if (f == null) {
- addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries
- f = cs.take();
+ if (f != null) {
+ return f.get(); //great we got a response
}
- return f.get();
} catch (ExecutionException e) {
- throwEnrichedException(e);
- return null; // unreachable
+ // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
+ // but the secondaries might still succeed. Continue on the replica RPCs.
+ exceptions = new ArrayList(rl.size());
+ exceptions.add(e);
+ completed++;
} catch (CancellationException e) {
throw new InterruptedIOException();
} catch (InterruptedException e) {
throw new InterruptedIOException();
+ }
+
+ // submit call for the all of the secondaries at once
+ // TODO: this may be an overkill for large region replication
+ submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1);
+ try {
+ while (completed < submitted) {
+ try {
+ Future f = cs.take();
+ return f.get(); // great we got an answer
+ } catch (ExecutionException e) {
+ // if not cancel or interrupt, wait until all RPC's are done
+ // one of the tasks failed. Save the exception for later.
+ if (exceptions == null) exceptions = new ArrayList(rl.size());
+ exceptions.add(e);
+ completed++;
+ }
+ }
+ } catch (CancellationException e) {
+ throw new InterruptedIOException();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
} finally {
// We get there because we were interrupted or because one or more of the
- // calls succeeded or failed. In all case, we stop all our tasks.
+ // calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll(true);
}
+
+ if (exceptions != null && !exceptions.isEmpty()) {
+ throwEnrichedException(exceptions.get(0)); // just rethrow the first exception for now.
+ }
+ return null; // unreachable
}
/**
@@ -230,8 +262,9 @@
* @param rl - the region locations
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
+ * @return the number of submitted calls
*/
- private void addCallsForReplica(BoundedCompletionService cs,
+ private int addCallsForReplica(BoundedCompletionService cs,
RegionLocations rl, int min, int max) {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
@@ -239,21 +272,22 @@
RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
cs.submit(retryingOnReplica);
}
+ return max - min + 1;
}
- private RegionLocations getRegionLocations(boolean useCache)
- throws RetriesExhaustedException, DoNotRetryIOException {
+ private RegionLocations getRegionLocations(boolean useCache, int replicaId)
+ throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
- rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true);
+ rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId);
+ } catch (DoNotRetryIOException e) {
+ throw e;
+ } catch (RetriesExhaustedException e) {
+ throw e;
+ } catch (InterruptedIOException e) {
+ throw e;
} catch (IOException e) {
- if (e instanceof DoNotRetryIOException) {
- throw (DoNotRetryIOException) e;
- } else if (e instanceof RetriesExhaustedException) {
- throw (RetriesExhaustedException) e;
- } else {
- throw new RetriesExhaustedException("Can't get the location", e);
- }
+ throw new RetriesExhaustedException("Can't get the location", e);
}
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
@@ -261,4 +295,4 @@
return rl;
}
-}
\ No newline at end of file
+}
Index: hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java
===================================================================
--- hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java (revision 1585756)
+++ hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java (working copy)
@@ -46,25 +46,25 @@
list = hrll((HRegionLocation)null);
assertTrue(list.isEmpty());
- assertEquals(0, list.size());
+ assertEquals(1, list.size());
assertEquals(0, list.numNonNullElements());
HRegionInfo info0 = hri(0);
list = hrll(hrl(info0, null));
- assertFalse(list.isEmpty());
+ assertTrue(list.isEmpty());
assertEquals(1, list.size());
- assertEquals(1, list.numNonNullElements());
+ assertEquals(0, list.numNonNullElements());
HRegionInfo info9 = hri(9);
list = hrll(hrl(info9, null));
- assertFalse(list.isEmpty());
+ assertTrue(list.isEmpty());
assertEquals(10, list.size());
- assertEquals(1, list.numNonNullElements());
+ assertEquals(0, list.numNonNullElements());
list = hrll(hrl(info0, null), hrl(info9, null));
- assertFalse(list.isEmpty());
+ assertTrue(list.isEmpty());
assertEquals(10, list.size());
- assertEquals(2, list.numNonNullElements());
+ assertEquals(0, list.numNonNullElements());
}
private HRegionInfo hri(int replicaId) {
@@ -100,7 +100,7 @@
list = hrll(hrl(info0, sn0));
assertTrue(list == list.removeByServer(sn1));
list = list.removeByServer(sn0);
- assertTrue(list.isEmpty());
+ assertEquals(0, list.numNonNullElements());
// test remove from multi element list
list = hrll(hrl(info0, sn0), hrl(info1, sn1), hrl(info2, sn2), hrl(info9, sn2));
@@ -226,7 +226,7 @@
list1 = list2.mergeLocations(list1);
assertEquals(sn0, list1.getRegionLocation(0).getServerName());
assertEquals(sn1, list1.getRegionLocation(1).getServerName());
- assertEquals(sn2, list1.getRegionLocation(2).getServerName());
+ assertEquals(2, list1.size()); // the size is taken from the argument list to merge
// do the other way merge as well
list1 = hrll(hrl(info0, sn0), hrl(info1, sn1));
@@ -240,10 +240,9 @@
list1 = hrll(hrl(info0, sn0), hrl(info1, sn1));
list2 = hrll(hrl(info0, sn2), hrl(info1, sn2), hrl(info9, sn3));
list1 = list2.mergeLocations(list1); // list1 should override
- assertEquals(10, list1.size());
+ assertEquals(2, list1.size());
assertEquals(sn0, list1.getRegionLocation(0).getServerName());
assertEquals(sn1, list1.getRegionLocation(1).getServerName());
- assertEquals(sn3, list1.getRegionLocation(9).getServerName());
// do the other way
list1 = hrll(hrl(info0, sn0), hrl(info1, sn1));
@@ -272,4 +271,35 @@
assertEquals(sn2, list1.getRegionLocation(1).getServerName());
assertEquals(sn3, list1.getRegionLocation(9).getServerName());
}
+
+ @Test
+ public void testConstructWithNullElements() {
+ // RegionLocations can contain null elements as well. These null elements can
+
+ RegionLocations list = new RegionLocations((HRegionLocation)null);
+ assertTrue(list.isEmpty());
+ assertEquals(1, list.size());
+ assertEquals(0, list.numNonNullElements());
+
+ list = new RegionLocations(null, hrl(info1, sn0));
+ assertFalse(list.isEmpty());
+ assertEquals(2, list.size());
+ assertEquals(1, list.numNonNullElements());
+
+ list = new RegionLocations(hrl(info0, sn0), null);
+ assertEquals(2, list.size());
+ assertEquals(1, list.numNonNullElements());
+
+ list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0));
+ assertEquals(10, list.size());
+ assertEquals(2, list.numNonNullElements());
+
+ list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null);
+ assertEquals(11, list.size());
+ assertEquals(2, list.numNonNullElements());
+
+ list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null, null);
+ assertEquals(12, list.size());
+ assertEquals(2, list.numNonNullElements());
+ }
}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (revision 1585756)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (working copy)
@@ -34,11 +34,12 @@
* A completion service, close to the one available in the JDK 1.7
* However, this ones keeps the list of the future, and allows to cancel them all.
* This means as well that it can be used for a small set of tasks only.
+ *
Implementation is not Thread safe.
*/
public class BoundedCompletionService {
private final Executor executor;
- private final List> sent; // alls the call we sent
- private final BlockingQueue> completed; // all the results we got so far.
+ private final List> tasks; // alls the tasks
+ private final BlockingQueue> completed; // all the tasks that are completed
class QueueingFuture extends FutureTask {
@@ -46,6 +47,7 @@
super(callable);
}
+ @Override
protected void done() {
completed.add(QueueingFuture.this);
}
@@ -53,7 +55,7 @@
public BoundedCompletionService(Executor executor, int maxTasks) {
this.executor = executor;
- this.sent = new ArrayList>(maxTasks);
+ this.tasks = new ArrayList>(maxTasks);
this.completed = new ArrayBlockingQueue>(maxTasks);
}
@@ -61,7 +63,7 @@
public Future submit(Callable task) {
QueueingFuture newFuture = new QueueingFuture(task);
executor.execute(newFuture);
- sent.add(newFuture);
+ tasks.add(newFuture);
return newFuture;
}
@@ -74,7 +76,7 @@
}
public void cancelAll(boolean interrupt) {
- for (Future future : sent) {
+ for (Future future : tasks) {
future.cancel(interrupt);
}
}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1585756)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
/**
* Thread Utility
@@ -38,6 +39,16 @@
public class Threads {
protected static final Log LOG = LogFactory.getLog(Threads.class);
private static final AtomicInteger poolNumber = new AtomicInteger(1);
+
+ private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
+ new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.warn("Thread:" + t + " exited with Exception:"
+ + StringUtils.stringifyException(e));
+ }
+ };
+
/**
* Utility method that sets name, daemon status and starts passed thread.
* @param t thread to run
@@ -160,15 +171,15 @@
}
/**
- * Create a new CachedThreadPool with a bounded number as the maximum
+ * Create a new CachedThreadPool with a bounded number as the maximum
* thread size in the pool.
- *
+ *
* @param maxCachedThread the maximum thread could be created in the pool
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param threadFactory the factory to use when creating new threads
- * @return threadPoolExecutor the cachedThreadPool with a bounded number
- * as the maximum thread size in the pool.
+ * @return threadPoolExecutor the cachedThreadPool with a bounded number
+ * as the maximum thread size in the pool.
*/
public static ThreadPoolExecutor getBoundedCachedThreadPool(
int maxCachedThread, long timeout, TimeUnit unit,
@@ -180,8 +191,8 @@
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
return boundedCachedThreadPool;
}
-
-
+
+
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
@@ -230,6 +241,8 @@
Thread t = namedFactory.newThread(r);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
+ } else {
+ t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
}
if (!t.isDaemon()) {
t.setDaemon(true);
@@ -242,4 +255,11 @@
};
}
+
+ /** Sets an UncaughtExceptionHandler for the thread which logs the
+ * Exception stack if the thread dies.
+ */
+ public static void setLoggingUncaughtExceptionHandler(Thread t) {
+ t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
+ }
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (revision 1585756)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (working copy)
@@ -274,6 +274,12 @@
}
@Override
+ public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId)
+ throws IOException {
+ return delegate.relocateRegion(tableName, row, replicaId);
+ }
+
+ @Override
public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
return delegate.relocateRegion(tableName, row);
}
@@ -331,6 +337,12 @@
}
@Override
+ public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
+ boolean retry, int replicaId) throws IOException {
+ return delegate.locateRegion(tableName, row, useCache, retry, replicaId);
+ }
+
+ @Override
public List locateRegions(byte[] tableName, boolean useCache, boolean offlined)
throws IOException {
return delegate.locateRegions(tableName, useCache, offlined);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1585756)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -637,25 +637,29 @@
status.setStatus("Writing region info on filesystem");
fs.checkRegionInfoOnFilesystem();
- // Remove temporary data left over from old regions
- status.setStatus("Cleaning up temporary data from old regions");
- fs.cleanupTempDir();
-
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status);
- status.setStatus("Cleaning up detritus from prior splits");
- // Get rid of any splits or merges that were lost in-progress. Clean out
- // these directories here on open. We may be opening a region that was
- // being split but we crashed in the middle of it all.
- fs.cleanupAnySplitDetritus();
- fs.cleanupMergesDir();
-
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
this.writestate.flushRequested = false;
this.writestate.compacting = 0;
+ if (this.writestate.writesEnabled) {
+ // Remove temporary data left over from old regions
+ status.setStatus("Cleaning up temporary data from old regions");
+ fs.cleanupTempDir();
+ }
+
+ if (this.writestate.writesEnabled) {
+ status.setStatus("Cleaning up detritus from prior splits");
+ // Get rid of any splits or merges that were lost in-progress. Clean out
+ // these directories here on open. We may be opening a region that was
+ // being split but we crashed in the middle of it all.
+ fs.cleanupAnySplitDetritus();
+ fs.cleanupMergesDir();
+ }
+
// Initialize split policy
this.splitPolicy = RegionSplitPolicy.create(this, conf);
@@ -753,9 +757,12 @@
}
}
mvcc.initialize(maxMemstoreTS + 1);
- // Recover any edits if available.
- maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
- this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+
+ if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
+ // Recover any edits if available.
+ maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
+ this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+ }
return maxSeqId;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (revision 1585756)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (working copy)
@@ -192,8 +192,9 @@
ArrayList storeFiles = new ArrayList(files.length);
for (FileStatus status: files) {
if (!StoreFileInfo.isValid(status)) continue;
-
- storeFiles.add(new StoreFileInfo(this.conf, this.fs, status));
+ StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
+ regionInfoForFs, familyName, status);
+ storeFiles.add(info);
}
return storeFiles;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1585756)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy)
@@ -497,7 +497,7 @@
completionService.submit(new Callable() {
@Override
public StoreFile call() throws IOException {
- StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
+ StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
return storeFile;
}
});
@@ -592,6 +592,10 @@
private StoreFile createStoreFileAndReader(final Path p) throws IOException {
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
+ return createStoreFileAndReader(info);
+ }
+
+ private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException {
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
this.family.getBloomFilterType());
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (revision 1585756)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (working copy)
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -35,7 +36,6 @@
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -133,6 +133,22 @@
}
/**
+ * Create a Store File Info from an HFileLink
+ * @param conf the {@link Configuration} to use
+ * @param fs The current file system to use.
+ * @param fileStatus The {@link FileStatus} of the file
+ */
+ public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
+ final HFileLink link)
+ throws IOException {
+ this.conf = conf;
+ this.fileStatus = fileStatus;
+ // HFileLink
+ this.reference = null;
+ this.link = link;
+ }
+
+ /**
* Sets the region coprocessor env.
* @param coprocessorHost
*/
@@ -195,6 +211,8 @@
long length = status.getLen();
if (this.reference != null) {
hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
+ } else if (this.link != null) {
+ hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs);
} else {
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
}
@@ -226,8 +244,18 @@
FileStatus status;
if (this.reference != null) {
if (this.link != null) {
- // HFileLink Reference
- status = link.getFileStatus(fs);
+ FileNotFoundException exToThrow = null;
+ for (int i = 0; i < this.link.getLocations().length; i++) {
+ // HFileLink Reference
+ try {
+ status = link.getFileStatus(fs);
+ return computeRefFileHDFSBlockDistribution(fs, reference, status);
+ } catch (FileNotFoundException ex) {
+ // try the other location
+ exToThrow = ex;
+ }
+ }
+ throw exToThrow;
} else {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
@@ -236,8 +264,18 @@
return computeRefFileHDFSBlockDistribution(fs, reference, status);
} else {
if (this.link != null) {
- // HFileLink
- status = link.getFileStatus(fs);
+ FileNotFoundException exToThrow = null;
+ for (int i = 0; i < this.link.getLocations().length; i++) {
+ // HFileLink
+ try {
+ status = link.getFileStatus(fs);
+ return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ } catch (FileNotFoundException ex) {
+ // try the other location
+ exToThrow = ex;
+ }
+ }
+ throw exToThrow;
} else {
status = this.fileStatus;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java (revision 1585756)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java (working copy)
@@ -18,9 +18,16 @@
package org.apache.hadoop.hbase.util;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
@@ -48,5 +55,39 @@
|| !isDefaultReplica(region.getRegionInfo());
}
+ /**
+ * Returns whether to replay the recovered edits to flush the results.
+ * Currently secondary region replicas do not replay the edits, since it would
+ * cause flushes which might affect the primary region. Primary regions even opened
+ * in read only mode should replay the edits.
+ * @param region the HRegion object
+ * @return whether recovered edits should be replayed.
+ */
+ public static boolean shouldReplayRecoveredEdits(HRegion region) {
+ return isDefaultReplica(region.getRegionInfo());
+ }
+ /**
+ * Returns a StoreFileInfo from the given FileStatus. Secondary replicas refer to the
+ * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This
+ * way ensures that the secondary will be able to continue reading the store files even if
+ * they are moved to archive after compaction
+ * @throws IOException
+ */
+ public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
+ HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status)
+ throws IOException {
+
+ // if this is a primary region, just return the StoreFileInfo constructed from path
+ if (regionInfo.equals(regionInfoForFs)) {
+ return new StoreFileInfo(conf, fs, status);
+ }
+
+ // else create a store file link. The link file does not exists on filesystem though.
+ HFileLink link = new HFileLink(conf,
+ HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName()
+ , familyName, status.getPath().getName()));
+ return new StoreFileInfo(conf, fs, status, link);
+ }
+
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy)
@@ -3928,6 +3928,7 @@
// create a primary region, load some data and flush
// create a secondary region, and do a get against that
Path rootDir = new Path(DIR + "testRegionReplicaSecondary");
+ TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
byte[][] families = new byte[][] {
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
@@ -3977,6 +3978,7 @@
// create a primary region, load some data and flush
// create a secondary region, and do a put against that
Path rootDir = new Path(DIR + "testRegionReplicaSecondary");
+ TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
byte[][] families = new byte[][] {
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
@@ -4024,7 +4026,60 @@
HRegion.closeHRegion(secondaryRegion);
}
}
+ }
+ @Test
+ public void testCompactionFromPrimary() throws IOException {
+ Path rootDir = new Path(DIR + "testRegionReplicaSecondary");
+ TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
+
+ byte[][] families = new byte[][] {
+ Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
+ };
+ byte[] cq = Bytes.toBytes("cq");
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
+ for (byte[] family : families) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+
+ long time = System.currentTimeMillis();
+ HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ false, time, 0);
+ HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ false, time, 1);
+
+ HRegion primaryRegion = null, secondaryRegion = null;
+
+ try {
+ primaryRegion = HRegion.createHRegion(primaryHri,
+ rootDir, TEST_UTIL.getConfiguration(), htd);
+
+ // load some data
+ putData(primaryRegion, 0, 1000, cq, families);
+
+ // flush region
+ primaryRegion.flushcache();
+
+ // open secondary region
+ secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, conf);
+
+ // move the file of the primary region to the archive, simulating a compaction
+ Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
+ primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
+ Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]);
+ Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0);
+
+ verifyData(secondaryRegion, 0, 1000, cq, families);
+ } finally {
+ if (primaryRegion != null) {
+ HRegion.closeHRegion(primaryRegion);
+ }
+ if (secondaryRegion != null) {
+ HRegion.closeHRegion(secondaryRegion);
+ }
+ }
}
private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java (working copy)
@@ -19,6 +19,12 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -29,6 +35,7 @@
import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -37,6 +44,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -43,6 +51,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mortbay.log.Log;
import com.google.protobuf.ServiceException;
@@ -296,4 +305,126 @@
closeRegion(hriSecondary);
}
}
+
+ @Test(timeout = 300000)
+ public void testFlushAndCompactionsInPrimary() throws Exception {
+
+ long runtime = 30 * 1000;
+ // enable store file refreshing
+ final int refreshPeriod = 100; // 100ms refresh is a lot
+ HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
+ HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
+ // restart the region server so that it starts the refresher chore
+ restartRegionServer();
+ final int startKey = 0, endKey = 1000;
+
+ try {
+ openRegion(hriSecondary);
+
+ //load some data to primary so that reader won't fail
+ HTU.loadNumericRows(table, f, startKey, endKey);
+ HTU.getHBaseAdmin().flush(table.getTableName());
+ // ensure that chore is run
+ Threads.sleep(2 * refreshPeriod);
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+ @SuppressWarnings("unchecked")
+ final AtomicReference[] exceptions = new AtomicReference[3];
+ for (int i=0; i < exceptions.length; i++) {
+ exceptions[i] = new AtomicReference();
+ }
+
+ Runnable writer = new Runnable() {
+ int key = startKey;
+ @Override
+ public void run() {
+ try {
+ while (running.get()) {
+ byte[] data = Bytes.toBytes(String.valueOf(key));
+ Put put = new Put(data);
+ put.add(f, null, data);
+ table.put(put);
+ key++;
+ if (key == endKey) key = startKey;
+ }
+ } catch (Exception ex) {
+ Log.warn(ex);
+ exceptions[0].compareAndSet(null, ex);
+ }
+ }
+ };
+
+ Runnable flusherCompactor = new Runnable() {
+ Random random = new Random();
+ @Override
+ public void run() {
+ try {
+ while (running.get()) {
+ // flush or compact
+ if (random.nextBoolean()) {
+ HTU.getHBaseAdmin().flush(table.getTableName());
+ } else {
+ HTU.compact(table.getName(), random.nextBoolean());
+ }
+ }
+ } catch (Exception ex) {
+ Log.warn(ex);
+ exceptions[1].compareAndSet(null, ex);
+ }
+ }
+ };
+
+ Runnable reader = new Runnable() {
+ Random random = new Random();
+ @Override
+ public void run() {
+ try {
+ while (running.get()) {
+ // whether to do a close and open
+ if (random.nextInt(10) == 0) {
+ try {
+ closeRegion(hriSecondary);
+ } catch (Exception ex) {
+ Log.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
+ exceptions[2].compareAndSet(null, ex);
+ }
+ try {
+ openRegion(hriSecondary);
+ } catch (Exception ex) {
+ Log.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
+ exceptions[2].compareAndSet(null, ex);
+ }
+ }
+
+ int key = random.nextInt(endKey - startKey) + startKey;
+ assertGetRpc(hriSecondary, key, true);
+ }
+ } catch (Exception ex) {
+ Log.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
+ exceptions[2].compareAndSet(null, ex);
+ }
+ }
+ };
+
+ Log.info("Starting writer and reader");
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ executor.submit(writer);
+ executor.submit(flusherCompactor);
+ executor.submit(reader);
+
+ // wait for threads
+ Threads.sleep(runtime);
+ running.set(false);
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ for (AtomicReference exRef : exceptions) {
+ Assert.assertNull(exRef.get());
+ }
+
+ } finally {
+ HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
+ closeRegion(hriSecondary);
+ }
+ }
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy)
@@ -81,6 +81,7 @@
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.util.Progressable;
import org.junit.experimental.categories.Category;
+import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
@@ -917,6 +918,7 @@
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
}
+ @Test
public void testRefreshStoreFiles() throws Exception {
init(this.getName());
@@ -963,6 +965,7 @@
}
@SuppressWarnings("unchecked")
+ @Test
public void testRefreshStoreFilesNotChanged() throws IOException {
init(this.getName());
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java (working copy)
@@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.junit.Assert;
import org.junit.Before;
@@ -62,6 +63,7 @@
public void setUp() {
TEST_UTIL = new HBaseTestingUtility();
testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore");
+ TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
}
private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
@@ -92,7 +94,7 @@
private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
- Path tableDir = new Path(testDir, htd.getTableName().getNameAsString());
+ Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (working copy)
@@ -120,7 +120,9 @@
}
protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
- return new HBaseReaderThread(readerId);
+ HBaseReaderThread reader = new HBaseReaderThread(readerId);
+ Threads.setLoggingUncaughtExceptionHandler(reader);
+ return reader;
}
public class HBaseReaderThread extends Thread {
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (working copy)
@@ -73,6 +73,7 @@
protected void createWriterThreads(int numThreads) throws IOException {
for (int i = 0; i < numThreads; ++i) {
HBaseWriterThread writer = new HBaseWriterThread(i);
+ Threads.setLoggingUncaughtExceptionHandler(writer);
writers.add(writer);
}
}
@@ -89,6 +90,7 @@
return new HTable(conf, tableName);
}
+ @Override
public void run() {
try {
long rowKeyBase;
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (revision 1585756)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java (working copy)
@@ -101,8 +101,8 @@
if (cached != null) {
result = "cached: " + cached.toString();
}
- if (real != null) {
- if (real.equals(cached)) {
+ if (real != null && real.getServerName() != null) {
+ if (cached != null && cached.getServerName() != null && real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";