diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java index 0fcd419..ab2556c 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Table; @@ -54,33 +55,29 @@ public class SecureBulkLoadClient { public String prepareBulkLoad(final TableName tableName) throws IOException { try { - return - table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - EMPTY_START_ROW, - LAST_ROW, - new Batch.Call() { - @Override - public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - - SecureBulkLoadProtos.PrepareBulkLoadRequest request = - SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - - instance.prepareBulkLoad(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response.getBulkToken(); - } - }).entrySet().iterator().next().getValue(); + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + + SecureBulkLoadProtos.PrepareBulkLoadRequest request = + SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); + + instance.prepareBulkLoad(controller, + request, + rpcCallback); + + SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + + return response.getBulkToken(); } catch (Throwable throwable) { throw new IOException(throwable); } @@ -88,32 +85,26 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final String bulkToken) throws IOException { try { - table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - EMPTY_START_ROW, - LAST_ROW, - new Batch.Call() { - - @Override - public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - - SecureBulkLoadProtos.CleanupBulkLoadRequest request = - SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder() - .setBulkToken(bulkToken).build(); - - instance.cleanupBulkLoad(controller, - request, - rpcCallback); - - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return null; - } - }); + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + + SecureBulkLoadProtos.CleanupBulkLoadRequest request = + SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder() + .setBulkToken(bulkToken).build(); + + instance.cleanupBulkLoad(controller, + request, + rpcCallback); + + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } } catch (Throwable throwable) { throw new IOException(throwable); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index c4d1b04..058992f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -199,10 +199,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } } - fs.delete(createStagingDir(baseStagingDir, - getActiveUser(), - new Path(request.getBulkToken()).getName()), - true); + fs.delete(new Path(request.getBulkToken()), true); done.run(CleanupBulkLoadResponse.newBuilder().build()); } catch (IOException e) { ResponseConverter.setControllerException(controller, e);