diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index e5f5694..633a48c 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -826,7 +826,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. - metaCache.clearCache(tableName, row); + // We are only supposed to clean the cache for the specific replicaId + metaCache.clearCache(tableName, row, replicaId); } // Query the meta region diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 14e0afd..c3b00b5 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -313,6 +313,40 @@ public class MetaCache { } /** + * Delete a cached location with specific replicaId. + * @param tableName tableName + * @param row row key + * @param replicaId region replica id + */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + if (toBeRemoved != null) { + RegionLocations updatedLocations = regionLocations.remove(replicaId); + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed; + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + + if (removed) { + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed " + toBeRemoved + " from cache"); + } + } + } + } + } + + /** * Delete a cached location for a table, row and server */ public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 91c6344..b5cddde 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -170,9 +170,36 @@ public class RpcRetryingCallerWithReadReplicas { throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); - RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() - : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); - final ResultBoundedCompletionService cs = + RegionLocations rl = null; + boolean skipPrimary = false; + try { + rl = getRegionLocations(true, + (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), + cConnection, tableName, get.getRow()); + } catch (RetriesExhaustedException | DoNotRetryIOException e) { + // When there is no specific replica id specified. It just needs to load all replicas. + if (isTargetReplicaSpecified) { + throw e; + } else { + // We cannot get the primary replica location, it is possible that the region + // server hosting meta is down, it needs to proceed to try cached replicas. + if (cConnection instanceof ConnectionImplementation) { + rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow()); + if (rl == null) { + // No cached locations + throw e; + } + + // Primary replica location is not known, skip primary replica + skipPrimary = true; + } else { + // For completeness + throw e; + } + } + } + + final ResultBoundedCompletionService cs = new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); int startIndex = 0; int endIndex = rl.size(); @@ -181,25 +208,30 @@ public class RpcRetryingCallerWithReadReplicas { addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); endIndex = 1; } else { - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response - } - } catch (ExecutionException e) { - // We ignore the ExecutionException and continue with the secondary replicas - if (LOG.isDebugEnabled()) { - LOG.debug("Primary replica returns " + e.getCause()); - } + if (!skipPrimary) { + addCallsForReplica(cs, rl, 0, 0); + try { + // wait for the timeout to see whether the primary responds back + Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f != null) { + return f.get(); //great we got a response + } + } catch (ExecutionException e) { + // We ignore the ExecutionException and continue with the secondary replicas + if (LOG.isDebugEnabled()) { + LOG.debug("Primary replica returns " + e.getCause()); + } - // Skip the result from the primary as we know that there is something wrong - startIndex = 1; - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + // Skip the result from the primary as we know that there is something wrong + startIndex = 1; + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } else { + // Since primary replica is skipped, the endIndex needs to be adjusted accordingly + endIndex --; } // submit call for the all of the secondaries at once @@ -288,10 +320,10 @@ public class RpcRetryingCallerWithReadReplicas { } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { throw e; } catch (IOException e) { - throw new RetriesExhaustedException("Can't get the location", e); + throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e); } if (rl == null) { - throw new RetriesExhaustedException("Can't get the locations"); + throw new RetriesExhaustedException("Can't get the location for replica " + replicaId); } return rl; diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 6b6acf0..bcd5d21 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; @@ -142,10 +143,25 @@ class ScannerCallableWithReplicas implements RetryingCallable { //2. We should close the "losing" scanners (scanners other than the ones we hear back // from first) // - RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, - RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, - currentScannerCallable.getRow()); - + RegionLocations rl = null; + try { + rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, + RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, + currentScannerCallable.getRow()); + } catch (RetriesExhaustedException | DoNotRetryIOException e) { + // We cannot get the primary replica region location, it is possible that the region server + // hosting meta table is down, it needs to proceed to try cached replicas directly. + if (cConnection instanceof ConnectionImplementation) { + rl = ((ConnectionImplementation) cConnection) + .getCachedLocation(tableName, currentScannerCallable.getRow()); + if (rl == null) { + throw e; + } + } else { + // For completeness + throw e; + } + } // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 1ad980c..2a3c808 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; @@ -162,8 +164,28 @@ public class TestReplicaWithCluster { /** * This copro is used to slow down the primary meta region scan a bit */ - public static class RegionServerHostingPrimayMetaRegionSlowCopro implements RegionObserver { + public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro implements RegionObserver { static boolean slowDownPrimaryMetaScan = false; + static boolean throwException = false; + + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + + int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); + + // Fail for the primary replica, but not for meta + if (throwException) { + if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { + LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment() + .getRegion().getRegionInfo()); + throw new RegionServerStoppedException("Server " + + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); + } + } else { + LOG.info("Get, We're replica region " + replicaId); + } + } @Override public RegionScanner preScannerOpen(final ObserverContext e, @@ -172,21 +194,34 @@ public class TestReplicaWithCluster { int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); // Slow down with the primary meta region scan - if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() - && (replicaId == 0))) { - LOG.info("Scan with primary meta region, slow down a bit"); - try { - Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); - } catch (InterruptedException ie) { - // Ingore + if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { + if (slowDownPrimaryMetaScan) { + LOG.info("Scan with primary meta region, slow down a bit"); + try { + Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); + } catch (InterruptedException ie) { + // Ingore + } } + // Fail for the primary replica + if (throwException) { + LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment() + .getRegion().getRegionInfo()); + + throw new RegionServerStoppedException("Server " + + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); + } else { + LOG.info("Scan, We're replica region " + replicaId); + } + } else { + LOG.info("Scan, We're replica region " + replicaId); } + return null; } } - @BeforeClass public static void beforeClass() throws Exception { // enable store file refreshing @@ -216,7 +251,7 @@ public class TestReplicaWithCluster { // Set system coprocessor so it can be applied to meta regions HTU.getConfiguration().set("hbase.coprocessor.region.classes", - RegionServerHostingPrimayMetaRegionSlowCopro.class.getName()); + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT, META_SCAN_TIMEOUT_IN_MILLISEC * 1000); @@ -630,18 +665,118 @@ public class TestReplicaWithCluster { HTU.createTable(hdt, new byte[][] { f }, null); - RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true; + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; // Get user table location, always get it from the primary meta replica RegionLocations url = ((ClusterConnection) HTU.getConnection()) .locateRegion(hdt.getTableName(), row, false, false); } finally { - RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false; + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false); HTU.getAdmin().setBalancerRunning(true, true); HTU.getAdmin().disableTable(hdt.getTableName()); HTU.deleteTable(hdt.getTableName()); } } + + + // This test is to simulate the case that the meta region and the primary user region are + // to access user replica regions and return stale data. + // Meta replica is enabled to show the case that the meta replica region could be out of sync + // with the primary meta region. + @Test + public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { + HTU.getAdmin().setBalancerRunning(false, true); + + ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true); + + // Create table then get the single region for our new table. + HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown"); + hdt.setRegionReplication(2); + try { + + Table table = HTU.createTable(hdt, new byte[][] { f }, null); + + // Get Meta location + RegionLocations mrl = ((ClusterConnection) HTU.getConnection()) + .locateRegion(TableName.META_TABLE_NAME, + HConstants.EMPTY_START_ROW, false, false); + + // Get user table location + RegionLocations url = ((ClusterConnection) HTU.getConnection()) + .locateRegion(hdt.getTableName(), row, false, false); + + // Make sure that user primary region is co-hosted with the meta region + if (!url.getDefaultRegionLocation().getServerName().equals( + mrl.getDefaultRegionLocation().getServerName())) { + HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), + mrl.getDefaultRegionLocation().getServerName()); + } + + // Make sure that the user replica region is not hosted by the same region server with + // primary + if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation() + .getServerName())) { + HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), + url.getDefaultRegionLocation().getServerName()); + } + + // Wait until the meta table is updated with new location info + while (true) { + mrl = ((ClusterConnection) HTU.getConnection()) + .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false); + + // Get user table location + url = ((ClusterConnection) HTU.getConnection()) + .locateRegion(hdt.getTableName(), row, false, true); + + LOG.info("meta locations " + mrl); + LOG.info("table locations " + url); + ServerName a = url.getDefaultRegionLocation().getServerName(); + ServerName b = mrl.getDefaultRegionLocation().getServerName(); + if(a.equals(b)) { + break; + } else { + LOG.info("Waiting for new region info to be updated in meta table"); + Thread.sleep(100); + } + } + + Put p = new Put(row); + p.addColumn(f, row, row); + table.put(p); + + // Flush so it can be picked by the replica refresher thread + HTU.flush(table.getName()); + + // Sleep for some time until data is picked up by replicas + try { + Thread.sleep(2 * REFRESH_PERIOD); + } catch (InterruptedException e1) { + LOG.error(e1); + } + + // Simulating the RS down + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true; + + // The first Get is supposed to succeed + Get g = new Get(row); + g.setConsistency(Consistency.TIMELINE); + Result r = table.get(g); + Assert.assertTrue(r.isStale()); + + // The second Get will succeed as well + r = table.get(g); + Assert.assertTrue(r.isStale()); + + } finally { + LOG.error("Damn, finally!"); + ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false); + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; + HTU.getAdmin().setBalancerRunning(true, true); + HTU.getAdmin().disableTable(hdt.getTableName()); + HTU.deleteTable(hdt.getTableName()); + } + } }