.../hadoop/hbase/regionserver/RSRpcServices.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9868b49..7c29b3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -639,31 +639,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param builder
* @param cellsToReturn Could be null. May be allocated in this method. This is what this
* method returns as a 'result'.
+ * @param closeCallBack the callback to be used with multigets
+ * @param context the current RpcCallContext
* @return Return the cellScanner passed
*/
private List doNonAtomicRegionMutation(final Region region,
final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
- final RegionActionResult.Builder builder, List cellsToReturn, long nonceGroup) {
+ final RegionActionResult.Builder builder, List cellsToReturn, long nonceGroup,
+ final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List mutations = null;
- RpcCallContext context = RpcServer.getCurrentCall();
- // An RpcCallBack that creates a list of scanners that needs to perform callBack
- // operation on completion of multiGets.
- RegionScannersCloseCallBack closeCallBack = null;
for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
if (action.hasGet()) {
- if (closeCallBack == null) {
- // Initialize only once
- closeCallBack = new RegionScannersCloseCallBack();
- // Set the call back here itself.
- context.setCallBack(closeCallBack);
- }
Get get = ProtobufUtil.toGet(action.getGet());
r = get(get, ((HRegion) region), closeCallBack, context);
} else if (action.hasServiceCall()) {
@@ -2084,7 +2077,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
Boolean processed = null;
-
+ RegionScannersCloseCallBack closeCallBack = null;
+ RpcCallContext context = null;
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
OperationQuota quota;
@@ -2131,8 +2125,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
} else {
// doNonAtomicRegionMutation manages the exception internally
+ if (closeCallBack == null) {
+ // An RpcCallBack that creates a list of scanners that needs to perform callBack
+ // operation on completion of multiGets.
+ // Set this only once
+ closeCallBack = new RegionScannersCloseCallBack();
+ context = RpcServer.getCurrentCall();
+ context.setCallBack(closeCallBack);
+ }
cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
- regionActionResultBuilder, cellsToReturn, nonceGroup);
+ regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
quota.close();