diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 6859cb3..b518238 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -139,7 +139,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final boolean hostnamesCanChange; private final long pause; private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified - private final boolean useMetaReplicas; + private boolean useMetaReplicas; private final int numTries; final int rpcTimeout; @@ -306,6 +306,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } /** + * @param useMetaReplicas + */ + @VisibleForTesting + void setUseMetaReplicas(final boolean useMetaReplicas) { + this.useMetaReplicas = useMetaReplicas; + } + + /** * @param conn The connection for which to replace the generator. * @param cnm Replaces the nonce generator used, for testing. * @return old nonce generator. @@ -810,7 +818,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. - metaCache.clearCache(tableName, row); + // We are only supposed to clean the cache for the specific replicaId + metaCache.clearCache(tableName, row, replicaId); } // Query the meta region diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 14e0afd..c3b00b5 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -313,6 +313,40 @@ public class MetaCache { } /** + * Delete a cached location with specific replicaId. + * @param tableName tableName + * @param row row key + * @param replicaId region replica id + */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + if (toBeRemoved != null) { + RegionLocations updatedLocations = regionLocations.remove(replicaId); + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed; + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + + if (removed) { + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed " + toBeRemoved + " from cache"); + } + } + } + } + } + + /** * Delete a cached location for a table, row and server */ public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 0050269..60ecc03 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -170,9 +170,36 @@ public class RpcRetryingCallerWithReadReplicas { throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); - RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() - : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); - final ResultBoundedCompletionService cs = + RegionLocations rl = null; + boolean skipPrimary = false; + try { + rl = getRegionLocations(true, + (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), + cConnection, tableName, get.getRow()); + } catch (RetriesExhaustedException e) { + // When there is no specific replica id specified. It just needs to load all replicas. + if (isTargetReplicaSpecified) { + throw e; + } else { + // We cannot get the primary replica location, it is possible that the region + // server hosting meta is down, it needs to proceed to try cached replicas. + if (cConnection instanceof ConnectionImplementation) { + rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow()); + if (rl == null) { + // No cached locations + throw e; + } + + // Primary replica location is not known, skip primary replica + skipPrimary = true; + } else { + // For completeness + throw e; + } + } + } + + final ResultBoundedCompletionService cs = new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); int startIndex = 0; int endIndex = rl.size(); @@ -181,25 +208,30 @@ public class RpcRetryingCallerWithReadReplicas { addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); endIndex = 1; } else { - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response - } - } catch (ExecutionException e) { - // We ignore the ExecutionException and continue with the secondary replicas - if (LOG.isDebugEnabled()) { - LOG.debug("Primary replica returns " + e.getCause()); - } + if (!skipPrimary) { + addCallsForReplica(cs, rl, 0, 0); + try { + // wait for the timeout to see whether the primary responds back + Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f != null) { + return f.get(); //great we got a response + } + } catch (ExecutionException e) { + // We ignore the ExecutionException and continue with the secondary replicas + if (LOG.isDebugEnabled()) { + LOG.debug("Primary replica returns " + e.getCause()); + } - // Skip the result from the primary as we know that there is something wrong - startIndex = 1; - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + // Skip the result from the primary as we know that there is something wrong + startIndex = 1; + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } else { + // Since primary replica is skipped, the endIndex needs to be adjusted accordingly + endIndex --; } // submit call for the all of the secondaries at once @@ -286,10 +318,10 @@ public class RpcRetryingCallerWithReadReplicas { } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { throw e; } catch (IOException e) { - throw new RetriesExhaustedException("Can't get the location", e); + throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e); } if (rl == null) { - throw new RetriesExhaustedException("Can't get the locations"); + throw new RetriesExhaustedException("Can't get the location for replica " + replicaId); } return rl; diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 6b6acf0..9849f99 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -142,10 +142,25 @@ class ScannerCallableWithReplicas implements RetryingCallable { //2. We should close the "losing" scanners (scanners other than the ones we hear back // from first) // - RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, - RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, - currentScannerCallable.getRow()); - + RegionLocations rl = null; + try { + rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, + RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, + currentScannerCallable.getRow()); + } catch (RetriesExhaustedException e) { + // We cannot get the primary replica region location, it is possible that the region server + // hosting meta table is down, it needs to proceed to try cached replicas directly. + if (cConnection instanceof ConnectionImplementation) { + rl = ((ConnectionImplementation) cConnection) + .getCachedLocation(tableName, currentScannerCallable.getRow()); + if (rl == null) { + throw e; + } + } else { + // For completeness + throw e; + } + } // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 2c77541..d68ed68 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -39,6 +39,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; @@ -88,7 +91,7 @@ public class TestReplicaWithCluster { @Override public void preGetOp(final ObserverContext e, - final Get get, final List results) throws IOException { + final Get get, final List results) throws IOException { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { CountDownLatch latch = cdl.get(); @@ -107,7 +110,7 @@ public class TestReplicaWithCluster { LOG.error(e1); } } else { - LOG.info("We're not the primary replicas."); + LOG.info("Get, We're not the primary replicas."); } } } @@ -130,10 +133,9 @@ public class TestReplicaWithCluster { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); throw new RegionServerStoppedException("Server " + - e.getEnvironment().getRegionServerServices().getServerName() - + " not running"); + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); } else { - LOG.info("We're replica region " + replicaId); + LOG.info("Get, We're replica region " + replicaId); } } @@ -147,10 +149,57 @@ public class TestReplicaWithCluster { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); throw new RegionServerStoppedException("Server " + - e.getEnvironment().getRegionServerServices().getServerName() - + " not running"); + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); + } else { + LOG.info("Scan, We're replica region " + replicaId); + } + + return null; + } + } + + /** + * This copro is used to simulate region server hosting meta down exception for Get and Scan + */ + public static class RegionServerHostingMetaStoppedCopro implements RegionObserver { + static boolean throwException = false; + + public RegionServerHostingMetaStoppedCopro() { + } + + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + + int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); + + // Fail for the primary replica, but not for meta + if (throwException) { + if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { + LOG.info("Get, throw Region Server Stopped Exceptoin for region " + + e.getEnvironment().getRegion().getRegionInfo()); + throw new RegionServerStoppedException("Server " + + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); + } else { + LOG.info("Get, We're replica region " + replicaId); + } + } + } + + @Override + public RegionScanner preScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { + + int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); + + // Fail for the primary replica + if (throwException && (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0)) { + LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + + e.getEnvironment().getRegion().getRegionInfo()); + throw new RegionServerStoppedException("Server " + + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); } else { - LOG.info("We're replica region " + replicaId); + LOG.info("Scan, We're replica region " + replicaId); } return null; @@ -160,8 +209,8 @@ public class TestReplicaWithCluster { @BeforeClass public static void beforeClass() throws Exception { // enable store file refreshing - HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, - REFRESH_PERIOD); + HTU.getConfiguration() + .setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD); HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); @@ -175,27 +224,36 @@ public class TestReplicaWithCluster { HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); + // Enable meta replica at server side + HTU.getConfiguration().setInt("hbase.meta.replica.count", 2); + // Retry less so it can fail faster HTU.getConfiguration().setInt("hbase.client.retries.number", 1); + // Make sure master does not host system tables. + HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); + + // Set system coprocessor so it can be applied to meta regions + HTU.getConfiguration().set("hbase.coprocessor.region.classes", + RegionServerHostingMetaStoppedCopro.class.getName()); + HTU.startMiniCluster(NB_SERVERS); HTU.getHBaseCluster().startMaster(); } @AfterClass public static void afterClass() throws Exception { - if (HTU2 != null) - HTU2.shutdownMiniCluster(); + if (HTU2 != null) HTU2.shutdownMiniCluster(); HTU.shutdownMiniCluster(); } - @Test (timeout=30000) + @Test(timeout = 30000) public void testCreateDeleteTable() throws IOException { // Create table then get the single region for our new table. HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable"); hdt.setRegionReplication(NB_SERVERS); hdt.addCoprocessor(SlowMeCopro.class.getName()); - Table table = HTU.createTable(hdt, new byte[][]{f}, null); + Table table = HTU.createTable(hdt, new byte[][] { f }, null); Put p = new Put(row); p.addColumn(f, row, row); @@ -222,12 +280,12 @@ public class TestReplicaWithCluster { HTU.deleteTable(hdt.getTableName()); } - @Test (timeout=120000) + @Test(timeout = 120000) public void testChangeTable() throws Exception { HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable"); hdt.setRegionReplication(NB_SERVERS); hdt.addCoprocessor(SlowMeCopro.class.getName()); - Table table = HTU.createTable(hdt, new byte[][]{f}, null); + Table table = HTU.createTable(hdt, new byte[][] { f }, null); // basic test: it should work. Put p = new Put(row); @@ -269,7 +327,7 @@ public class TestReplicaWithCluster { } Admin admin = HTU.getAdmin(); - nHdt =admin.getTableDescriptor(hdt.getTableName()); + nHdt = admin.getTableDescriptor(hdt.getTableName()); Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); @@ -279,8 +337,9 @@ public class TestReplicaWithCluster { } @SuppressWarnings("deprecation") - @Test (timeout=300000) - public void testReplicaAndReplication() throws Exception { + @Test(timeout = 300000) + public void testReplicaAndReplication() + throws Exception { HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication"); hdt.setRegionReplication(NB_SERVERS); @@ -363,26 +422,26 @@ public class TestReplicaWithCluster { // the minicluster has negative impact of deleting all HConnections in JVM. } - @Test (timeout=30000) + @Test(timeout = 30000) public void testBulkLoad() throws IOException { // Create table then get the single region for our new table. LOG.debug("Creating test table"); HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad"); hdt.setRegionReplication(NB_SERVERS); hdt.addCoprocessor(SlowMeCopro.class.getName()); - Table table = HTU.createTable(hdt, new byte[][]{f}, null); + Table table = HTU.createTable(hdt, new byte[][] { f }, null); // create hfiles to load. LOG.debug("Creating test data"); Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad"); final int numRows = 10; final byte[] qual = Bytes.toBytes("qual"); - final byte[] val = Bytes.toBytes("val"); + final byte[] val = Bytes.toBytes("val"); final List> famPaths = new ArrayList<>(); for (HColumnDescriptor col : hdt.getColumnFamilies()) { Path hfile = new Path(dir, col.getNameAsString()); - TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), - qual, val, numRows); + TestHRegionServerBulkLoad + .createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual, val, numRows); famPaths.add(new Pair<>(col.getName(), hfile.toString())); } @@ -392,19 +451,17 @@ public class TestReplicaWithCluster { table = conn.getTable(hdt.getTableName()); final String bulkToken = new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); - ClientServiceCallable callable = new ClientServiceCallable(conn, - hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), + ClientServiceCallable callable = new ClientServiceCallable(conn, hdt.getTableName(), + TestHRegionServerBulkLoad.rowkey(0), new RpcControllerFactory(HTU.getConfiguration()).newController()) { - @Override - protected Void rpcCall() throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); + @Override protected Void rpcCall() throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes + .toStringBinary(getRow())); SecureBulkLoadClient secureClient = null; byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); - secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - true, null, bulkToken); + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, bulkToken); } return null; } @@ -570,4 +627,104 @@ public class TestReplicaWithCluster { HTU.deleteTable(hdt.getTableName()); } } -} + + // This test is to simulate the case that the meta region and the primary user region are + // hosted by the same region server, and this server is down. In this case, Get is able + // to access user replica regions and return stale data. + // Meta replica is enabled to show the case that the meta replica region could be out of sync + // with the primary meta region. + @Test + public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { + HTU.getAdmin().setBalancerRunning(false, true); + + // TODO:Enable the client to use meta replica, this is disabled due to HBASE-18035 + ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false); + + // Create table then get the single region for our new table. + HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown"); + hdt.setRegionReplication(2); + try { + + Table table = HTU.createTable(hdt, new byte[][] { f }, null); + + // Get Meta location + RegionLocations mrl = ((ClusterConnection) HTU.getConnection()) + .locateRegion(TableName.META_TABLE_NAME, + HConstants.EMPTY_START_ROW, false, false); + + // Get user table location + RegionLocations url = ((ClusterConnection) HTU.getConnection()) + .locateRegion(hdt.getTableName(), row, false, false); + + // Make sure that user primary region is co-hosted with the meta region + if (!url.getDefaultRegionLocation().getServerName().equals( + mrl.getDefaultRegionLocation().getServerName())) { + HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), + mrl.getDefaultRegionLocation().getServerName()); + } + + // Make sure that the user replica region is not hosted by the same region server with + // primary + if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation() + .getServerName())) { + HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), + url.getDefaultRegionLocation().getServerName()); + } + + // Wait until the meta table is updated with new location info + while (true) { + mrl = ((ClusterConnection) HTU.getConnection()) + .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false); + + // Get user table location + url = ((ClusterConnection) HTU.getConnection()) + .locateRegion(hdt.getTableName(), row, false, true); + + LOG.info("meta locations " + mrl); + LOG.info("table locations " + url); + ServerName a = url.getDefaultRegionLocation().getServerName(); + ServerName b = mrl.getDefaultRegionLocation().getServerName(); + if(a.equals(b)) { + break; + } else { + LOG.info("Waiting for new region info to be updated in meta table"); + Thread.sleep(100); + } + } + + Put p = new Put(row); + p.addColumn(f, row, row); + table.put(p); + + // Flush so it can be picked by the replica refresher thread + HTU.flush(table.getName()); + + // Sleep for some time until data is picked up by replicas + try { + Thread.sleep(2 * REFRESH_PERIOD); + } catch (InterruptedException e1) { + LOG.error(e1); + } + + // Simulating the RS down + RegionServerHostingMetaStoppedCopro.throwException = true; + + // The first Get is supposed to succeed + Get g = new Get(row); + g.setConsistency(Consistency.TIMELINE); + Result r = table.get(g); + Assert.assertTrue(r.isStale()); + + // The second Get will succeed as well + r = table.get(g); + Assert.assertTrue(r.isStale()); + + } finally { + ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false); + RegionServerHostingMetaStoppedCopro.throwException = false; + HTU.getAdmin().setBalancerRunning(true, true); + HTU.getAdmin().disableTable(hdt.getTableName()); + HTU.deleteTable(hdt.getTableName()); + } + } +} \ No newline at end of file