diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java index 0fcd419..ab2556c 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Table; @@ -54,33 +55,29 @@ public class SecureBulkLoadClient { public String prepareBulkLoad(final TableName tableName) throws IOException { try { - return - table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - EMPTY_START_ROW, - LAST_ROW, - new Batch.Call() { - @Override - public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - - SecureBulkLoadProtos.PrepareBulkLoadRequest request = - SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - - instance.prepareBulkLoad(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response.getBulkToken(); - } - }).entrySet().iterator().next().getValue(); + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + + SecureBulkLoadProtos.PrepareBulkLoadRequest request = + SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); + + instance.prepareBulkLoad(controller, + request, + rpcCallback); + + SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + + return response.getBulkToken(); } catch (Throwable throwable) { throw new IOException(throwable); } @@ -88,32 +85,26 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final String bulkToken) throws IOException { try { - table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - EMPTY_START_ROW, - LAST_ROW, - new Batch.Call() { - - @Override - public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - - SecureBulkLoadProtos.CleanupBulkLoadRequest request = - SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder() - .setBulkToken(bulkToken).build(); - - instance.cleanupBulkLoad(controller, - request, - rpcCallback); - - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return null; - } - }); + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + + SecureBulkLoadProtos.CleanupBulkLoadRequest request = + SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder() + .setBulkToken(bulkToken).build(); + + instance.cleanupBulkLoad(controller, + request, + rpcCallback); + + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } } catch (Throwable throwable) { throw new IOException(throwable); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 764a12c..aec200d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -197,10 +197,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } } - fs.delete(createStagingDir(baseStagingDir, - getActiveUser(), - new Path(request.getBulkToken()).getName()), - true); + fs.delete(new Path(request.getBulkToken()), true); done.run(CleanupBulkLoadResponse.newBuilder().build()); } catch (IOException e) { ResponseConverter.setControllerException(controller, e); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index a9a75c8..fff0200 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -50,6 +51,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; /** * Test cases for the "load" half of the HFileOutputFormat bulk load @@ -260,6 +262,16 @@ public class TestLoadIncrementalHFiles { table.close(); } + // verify staging folder has been cleaned up + Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); + if(fs.exists(stagingBasePath)) { + FileStatus[] files = fs.listStatus(stagingBasePath); + for(FileStatus file : files) { + assertTrue("Folder=" + file.getPath() + " is not cleaned up.", + file.getPath().getName() != "DONOTERASE"); + } + } + util.deleteTable(tableName); }