.../hadoop/hbase/regionserver/RSRpcServices.java | 28 ++++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9868b49..7c29b3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -639,31 +639,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param builder * @param cellsToReturn Could be null. May be allocated in this method. This is what this * method returns as a 'result'. + * @param closeCallBack the callback to be used with multigets + * @param context the current RpcCallContext * @return Return the cellScanner passed */ private List doNonAtomicRegionMutation(final Region region, final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, - final RegionActionResult.Builder builder, List cellsToReturn, long nonceGroup) { + final RegionActionResult.Builder builder, List cellsToReturn, long nonceGroup, + final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) { // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do // one at a time, we instead pass them in batch. Be aware that the corresponding // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List mutations = null; - RpcCallContext context = RpcServer.getCurrentCall(); - // An RpcCallBack that creates a list of scanners that needs to perform callBack - // operation on completion of multiGets. - RegionScannersCloseCallBack closeCallBack = null; for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; if (action.hasGet()) { - if (closeCallBack == null) { - // Initialize only once - closeCallBack = new RegionScannersCloseCallBack(); - // Set the call back here itself. - context.setCallBack(closeCallBack); - } Get get = ProtobufUtil.toGet(action.getGet()); r = get(get, ((HRegion) region), closeCallBack, context); } else if (action.hasServiceCall()) { @@ -2084,7 +2077,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); Boolean processed = null; - + RegionScannersCloseCallBack closeCallBack = null; + RpcCallContext context = null; for (RegionAction regionAction : request.getRegionActionList()) { this.requestCount.add(regionAction.getActionCount()); OperationQuota quota; @@ -2131,8 +2125,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else { // doNonAtomicRegionMutation manages the exception internally + if (closeCallBack == null) { + // An RpcCallBack that creates a list of scanners that needs to perform callBack + // operation on completion of multiGets. + // Set this only once + closeCallBack = new RegionScannersCloseCallBack(); + context = RpcServer.getCurrentCall(); + context.setCallBack(closeCallBack); + } cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, - regionActionResultBuilder, cellsToReturn, nonceGroup); + regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context); } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); quota.close();