Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java (revision 1389711) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcChannel.java (working copy) @@ -18,23 +18,30 @@ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.*; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.ServerCallable; -import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; -import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; - /** * Provides clients with an RPC connection to call coprocessor endpoint {@link Service}s * against a given table region. An instance of this class may be obtained @@ -51,6 +58,7 @@ private final byte[] table; private final byte[] row; private byte[] lastRegion; + private HRegionInfo actualExecutedReGion; public CoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) { this.connection = conn; @@ -107,7 +115,8 @@ ServerCallable callable = new ServerCallable(connection, table, row) { public CoprocessorServiceResponse call() throws Exception { - byte[] regionName = location.getRegionInfo().getRegionName(); + actualExecutedReGion = location.getRegionInfo(); + byte[] regionName = actualExecutedReGion.getRegionName(); return ProtobufUtil.execService(server, call, regionName); } }; @@ -126,6 +135,10 @@ return response; } + public HRegionInfo getActualExecutedRegion() { + return actualExecutedReGion; + } + public byte[] getLastRegion() { return lastRegion; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1389711) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -2138,6 +2138,7 @@ * for each result * @param the protocol interface type * @param the callable's return type + * @param endKey ending row in range for the callable, inclusive * @throws IOException */ @Deprecated @@ -2147,12 +2148,17 @@ final byte[] tableName, ExecutorService pool, final Batch.Call callable, - final Batch.Callback callback) + final Batch.Callback callback, + final byte[] endKey) throws IOException, Throwable { - + + final List> missedRows = new ArrayList>(); Map> futures = new TreeMap>(Bytes.BYTES_COMPARATOR); - for (final byte[] r : rows) { + int rowsSize = rows.size(); + for (int i = 0; i < rowsSize; i++) { + final byte[] r = rows.get(i); + final byte[] nextRow = (i + 1 < rowsSize) ? rows.get(i + 1) : endKey; final ExecRPCInvoker invoker = new ExecRPCInvoker(conf, this, protocol, tableName, r); Future future = pool.submit( @@ -2163,9 +2169,17 @@ invoker); R result = callable.call(instance); byte[] region = invoker.getRegionName(); + HRegionInfo actualExecutedRegion = invoker.getActualExecutedRegion(); if (callback != null) { callback.update(region, r, result); } + if ((Bytes.compareTo(actualExecutedRegion.getEndKey(), nextRow) < 0 + || Bytes.equals(nextRow, HConstants.EMPTY_END_ROW)) && + !Bytes.equals(actualExecutedRegion.getEndKey(),HConstants.EMPTY_END_ROW)) { + synchronized(missedRows){ + missedRows.add(new Pair(actualExecutedRegion.getEndKey(),nextRow)); + } + } return result; } }); @@ -2183,6 +2197,23 @@ Bytes.toStringBinary(e.getKey()), ie); } } + + if (!missedRows.isEmpty()) { + for (Pair lostRow : missedRows) { + List redoRows = new ArrayList(); + byte[] currentKey = lostRow.getFirst(); + do { + HRegionLocation regionLocation = getRegionLocation(tableName, + currentKey, false); + redoRows.add(currentKey); + currentKey = regionLocation.getRegionInfo().getEndKey(); + } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) + && (Bytes.equals(lostRow.getSecond(), HConstants.EMPTY_END_ROW) || Bytes + .compareTo(currentKey, lostRow.getSecond()) < 0)); + processExecs(protocol, redoRows, tableName, pool, callable, callback, + lostRow.getSecond()); + } + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (revision 1389711) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.ServerCallable; import org.apache.hadoop.hbase.client.coprocessor.Exec; @@ -51,6 +52,7 @@ private final byte[] table; private final byte[] row; private byte[] regionName; + private HRegionInfo actualExecutedReGion; public ExecRPCInvoker(Configuration conf, HConnection connection, @@ -76,8 +78,8 @@ ServerCallable callable = new ServerCallable(connection, table, row) { public ExecResult call() throws Exception { - byte[] regionName = location.getRegionInfo().getRegionName(); - return ProtobufUtil.execCoprocessor(server, exec, regionName); + actualExecutedReGion = location.getRegionInfo(); + return ProtobufUtil.execCoprocessor(server, exec, actualExecutedReGion.getRegionName()); } }; ExecResult result = callable.withRetries(); @@ -92,6 +94,10 @@ return null; } + public HRegionInfo getActualExecutedRegion() { + return actualExecutedReGion; + } + public byte[] getRegionName() { return regionName; } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (revision 1389711) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -128,9 +129,7 @@ table.close(); } - @Test - public void testAggregation() throws Throwable { - HTable table = new HTable(util.getConfiguration(), TEST_TABLE); + private void internalAggregation(HTable table) throws Throwable { Map results; // scan: for all regions @@ -180,6 +179,54 @@ } @Test + public void testAggregation() throws Throwable { + HTable table = new HTable(util.getConfiguration(), TEST_TABLE); + internalAggregation(table); + } + + @Test + public void testAggregationWithRegionSplit() throws Throwable { + TestHTable table = new TestHTable(util.getConfiguration(), TEST_TABLE); + // load region location to the cache + table.getRegionsInRange(HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY); + table.hackForGetStartKeysInRange = true; + internalAggregation(table); + } + + public static class TestHTable extends HTable { + + boolean hackForGetStartKeysInRange = false; + + public TestHTable(Configuration conf, byte[] tableName) throws IOException { + super(conf, tableName); + } + + protected List getStartKeysInRange(byte[] start, byte[] end) + throws IOException { + List result = super.getStartKeysInRange(start, end); + if (hackForGetStartKeysInRange) { + HBaseAdmin hba = new HBaseAdmin(super.getConfiguration()); + try { + hba.split(super.getTableName(), ROWS[rowSeperator1 - 2]); + hba.split(super.getTableName(), ROWS[rowSeperator1 + 2]); + hba.split(super.getTableName(), ROWS[rowSeperator2 + 2]); + + long timeout = System.currentTimeMillis() + (15 * 1000); + while ((System.currentTimeMillis() < timeout) + && (super.getRegionLocations().size() != 6)) { + Thread.sleep(250); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + hba.close(); + } + return result; + } + } + + @Test public void testCoprocessorService() throws Throwable { HTable table = new HTable(util.getConfiguration(), TEST_TABLE); NavigableMap regions = table.getRegionLocations(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1389711) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -324,6 +324,7 @@ * for each result * @param the protocol interface type * @param the callable's return type + * @param endKey ending row in range for the callable, inclusive * @throws IOException */ public void processExecs( @@ -332,7 +333,8 @@ final byte[] tableName, ExecutorService pool, final Batch.Call call, - final Batch.Callback callback) throws IOException, Throwable; + final Batch.Callback callback, + final byte[] endKey) throws IOException, Throwable; /** * Enable or disable region cache prefetch for the table. It will be Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1389711) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -1378,7 +1378,7 @@ // get regions covered by the row range List keys = getStartKeysInRange(startKey, endKey); connection.processExecs(protocol, keys, tableName, pool, callable, - callback); + callback, endKey); } /** @@ -1415,9 +1415,13 @@ // get regions covered by the row range List keys = getStartKeysInRange(startKey, endKey); + final List> missedRows = new ArrayList>(); Map> futures = new TreeMap>(Bytes.BYTES_COMPARATOR); - for (final byte[] r : keys) { + int rowsSize = keys.size(); + for (int i = 0; i < rowsSize; i++) { + final byte[] r = keys.get(i); + final byte[] nextRow = (i + 1 < rowsSize) ? keys.get(i + 1) : endKey; final CoprocessorRpcChannel channel = new CoprocessorRpcChannel(connection, tableName, r); Future future = pool.submit( @@ -1426,9 +1430,17 @@ T instance = ProtobufUtil.newServiceStub(service, channel); R result = callable.call(instance); byte[] region = channel.getLastRegion(); + HRegionInfo actualExecutedRegion = channel.getActualExecutedRegion(); if (callback != null) { callback.update(region, r, result); } + if ((Bytes.compareTo(actualExecutedRegion.getEndKey(), nextRow) < 0 + || Bytes.equals(nextRow, HConstants.EMPTY_END_ROW)) && + !Bytes.equals(actualExecutedRegion.getEndKey(),HConstants.EMPTY_END_ROW)) { + synchronized(missedRows){ + missedRows.add(new Pair(actualExecutedRegion.getEndKey(),nextRow)); + } + } return result; } }); @@ -1448,9 +1460,16 @@ .initCause(ie); } } + + if (!missedRows.isEmpty()) { + for (Pair lostRow : missedRows) { + coprocessorService(service, lostRow.getFirst(), lostRow.getSecond(), + callable, callback); + } + } } - private List getStartKeysInRange(byte[] start, byte[] end) + protected List getStartKeysInRange(byte[] start, byte[] end) throws IOException { Pair startEndKeys = getStartEndKeys(); byte[][] startKeys = startEndKeys.getFirst();