diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java index fe141c9..f1ef5e4 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java @@ -37,6 +37,7 @@ import com.google.common.collect.ListMultimap; @InterfaceStability.Evolving public abstract class Query extends OperationWithAttributes { protected Filter filter = null; + protected int targetReplicaId = -1; /** * @return Filter @@ -103,4 +104,13 @@ public abstract class Query extends OperationWithAttributes { setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL, ProtobufUtil.toUsersAndPermissions(permMap).toByteArray()); } + + // Specify region replica where Query will fetch data from + public void setReplicaId(int Id) { + this.targetReplicaId = Id; + } + + public int getReplicaId() { + return this.targetReplicaId; + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 1c733b6..297b584 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -185,27 +185,34 @@ public class RpcRetryingCallerWithReadReplicas { */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, - cConnection, tableName, get.getRow()); + boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); + + RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() + : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size()); - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response + if(isTargetReplicaSpecified) { + addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); + } else { + addCallsForReplica(cs, rl, 0, 0); + try { + // wait for the timeout to see whether the primary responds back + Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f != null) { + return f.get(); //great we got a response + } + } catch (ExecutionException e) { + throwEnrichedException(e, retries); + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); } - } catch (ExecutionException e) { - throwEnrichedException(e, retries); - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + + // submit call for the all of the secondaries at once + addCallsForReplica(cs, rl, 1, rl.size() - 1); } - // submit call for the all of the secondaries at once - addCallsForReplica(cs, rl, 1, rl.size() - 1); try { try { Future f = cs.take(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index a7067f1..9fbaa05 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TestMetaTableAccessor; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -197,6 +198,31 @@ public class TestRegionReplicas { closeRegion(hriSecondary); } } + + @Test(timeout = 60000) + public void testGetOnTargetRegionReplica() throws Exception { + try { + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + // assert that we can read back from primary + Assert.assertEquals(1000, HTU.countRows(table)); + // flush so that region replica can read + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + openRegion(hriSecondary); + + // try directly Get against region replica + byte[] row = Bytes.toBytes(String.valueOf(42)); + Get get = new Get(row); + get.setConsistency(Consistency.TIMELINE); + get.setReplicaId(1); + Result result = table.get(get); + Assert.assertArrayEquals(row, result.getValue(f, null)); + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); + closeRegion(hriSecondary); + } + } private void assertGet(HRegion region, int value, boolean expect) throws IOException { byte[] row = Bytes.toBytes(String.valueOf(value));