diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java index 3882163..783cb6e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.client.coprocessor; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.HConstants.LAST_ROW; + import com.google.protobuf.ByteString; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -51,8 +54,8 @@ public class SecureBulkLoadClient { try { return table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - HConstants.EMPTY_START_ROW, - HConstants.EMPTY_START_ROW, + EMPTY_START_ROW, + LAST_ROW, new Batch.Call() { @Override public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { @@ -84,8 +87,8 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final String bulkToken) throws IOException { try { table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - HConstants.EMPTY_START_ROW, - HConstants.EMPTY_START_ROW, + EMPTY_START_ROW, + LAST_ROW, new Batch.Call() { @Override @@ -118,54 +121,50 @@ public class SecureBulkLoadClient { final Token userToken, final String bulkToken, final byte[] startRow) throws IOException { + // we never want to send a batch of HFiles to all regions, thus cannot call + // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 try { - return - table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - startRow, - startRow, - new Batch.Call() { - - @Override - public Boolean call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { - SecureBulkLoadProtos.DelegationTokenProto protoDT = - SecureBulkLoadProtos.DelegationTokenProto.newBuilder().build(); - if(userToken != null) { - protoDT = - SecureBulkLoadProtos.DelegationTokenProto.newBuilder() - .setIdentifier(ByteString.copyFrom(userToken.getIdentifier())) - .setPassword(ByteString.copyFrom(userToken.getPassword())) - .setKind(userToken.getKind().toString()) - .setService(userToken.getService().toString()).build(); - } - - List protoFamilyPaths = - new ArrayList(); - for(Pair el: familyPaths) { - protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() - .setFamily(ByteString.copyFrom(el.getFirst())) - .setPath(el.getSecond()).build()); - } - - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() - .setFsToken(protoDT) - .addAllFamilyPath(protoFamilyPaths) - .setBulkToken(bulkToken).build(); - - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.secureBulkLoadHFiles(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response.getLoaded(); - } - }).entrySet().iterator().next().getValue(); + CoprocessorRpcChannel channel = table.coprocessorService(startRow); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + SecureBulkLoadProtos.DelegationTokenProto protoDT = + SecureBulkLoadProtos.DelegationTokenProto.newBuilder().build(); + if(userToken != null) { + protoDT = + SecureBulkLoadProtos.DelegationTokenProto.newBuilder() + .setIdentifier(ByteString.copyFrom(userToken.getIdentifier())) + .setPassword(ByteString.copyFrom(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + } + + List protoFamilyPaths = + new ArrayList(); + for(Pair el: familyPaths) { + protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() + .setFamily(ByteString.copyFrom(el.getFirst())) + .setPath(el.getSecond()).build()); + } + + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() + .setFsToken(protoDT) + .addAllFamilyPath(protoFamilyPaths) + .setBulkToken(bulkToken).build(); + + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.secureBulkLoadHFiles(controller, + request, + rpcCallback); + + SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return response.getLoaded(); } catch (Throwable throwable) { throw new IOException(throwable); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 076218a..7e41153 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -549,7 +549,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); + + Bytes.toStringBinary(getRow()) + "with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); if(!useSecure) { success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);