From 375cdd8ed44ea66dd36cb1cc4850dffa6bce6d0f Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 24 Sep 2013 15:21:44 -0700 Subject: [PATCH] HBASE-9639 SecureBulkLoad dispatches file load requests to all Regions --- .../client/coprocessor/SecureBulkLoadClient.java | 103 ++++++++++----------- .../hbase/mapreduce/LoadIncrementalHFiles.java | 2 +- 2 files changed, 52 insertions(+), 53 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java index b988661..c533d26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java @@ -18,14 +18,17 @@ package org.apache.hadoop.hbase.client.coprocessor; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.HConstants.LAST_ROW; + import com.google.protobuf.ByteString; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -54,8 +57,8 @@ public class SecureBulkLoadClient { try { return table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - HConstants.EMPTY_START_ROW, - HConstants.EMPTY_START_ROW, + EMPTY_START_ROW, + LAST_ROW, new Batch.Call() { @Override public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { @@ -87,8 +90,8 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final String bulkToken) throws IOException { try { table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - HConstants.EMPTY_START_ROW, - HConstants.EMPTY_START_ROW, + EMPTY_START_ROW, + LAST_ROW, new Batch.Call() { @Override @@ -121,54 +124,50 @@ public class SecureBulkLoadClient { final Token userToken, final String bulkToken, final byte[] startRow) throws IOException { + // we never want to send a batch of HFiles to all regions, thus cannot call + // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 try { - return - table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class, - startRow, - startRow, - new Batch.Call() { - - @Override - public Boolean call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException { - SecureBulkLoadProtos.DelegationToken protoDT = - SecureBulkLoadProtos.DelegationToken.newBuilder().build(); - if(userToken != null) { - protoDT = - SecureBulkLoadProtos.DelegationToken.newBuilder() - .setIdentifier(ByteString.copyFrom(userToken.getIdentifier())) - .setPassword(ByteString.copyFrom(userToken.getPassword())) - .setKind(userToken.getKind().toString()) - .setService(userToken.getService().toString()).build(); - } - - List protoFamilyPaths = - new ArrayList(); - for(Pair el: familyPaths) { - protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() - .setFamily(ByteString.copyFrom(el.getFirst())) - .setPath(el.getSecond()).build()); - } - - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() - .setFsToken(protoDT) - .addAllFamilyPath(protoFamilyPaths) - .setBulkToken(bulkToken).build(); - - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.secureBulkLoadHFiles(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response.getLoaded(); - } - }).entrySet().iterator().next().getValue(); + CoprocessorRpcChannel channel = table.coprocessorService(startRow); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + SecureBulkLoadProtos.DelegationToken protoDT = + SecureBulkLoadProtos.DelegationToken.newBuilder().build(); + if(userToken != null) { + protoDT = + SecureBulkLoadProtos.DelegationToken.newBuilder() + .setIdentifier(ByteString.copyFrom(userToken.getIdentifier())) + .setPassword(ByteString.copyFrom(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + } + + List protoFamilyPaths = + new ArrayList(); + for(Pair el: familyPaths) { + protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() + .setFamily(ByteString.copyFrom(el.getFirst())) + .setPath(el.getSecond()).build()); + } + + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() + .setFsToken(protoDT) + .addAllFamilyPath(protoFamilyPaths) + .setBulkToken(bulkToken).build(); + + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.secureBulkLoadHFiles(controller, + request, + rpcCallback); + + SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return response.getLoaded(); } catch (Throwable throwable) { throw new IOException(throwable); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index ec48858..14411d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -558,7 +558,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); + + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); if(!useSecure) { success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); -- 1.8.3.4