From 4a13569e64c13803794abd454b2645872d901be6 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 21 Dec 2018 12:02:16 -0800 Subject: [PATCH] HBASE-20900. Improve FsDelegationToken to support KMS delegation tokens Change-Id: Ibcd80caa34786f3493afbe8160599e5902d232d1 --- .../hadoop/hbase/client/SecureBulkLoadClient.java | 26 ++++++- .../hbase/shaded/protobuf/RequestConverter.java | 41 ++++++++--- .../apache/hadoop/hbase/coprocessor/Export.java | 86 ++++++++++++++-------- .../security/access/SecureBulkLoadEndpoint.java | 8 +- hbase-endpoint/src/main/protobuf/Export.proto | 1 + .../src/main/protobuf/SecureBulkLoad.proto | 1 + .../regionserver/SecureBulkLoadEndpointClient.java | 20 +++-- .../src/main/protobuf/Client.proto | 1 + hbase-protocol/src/main/protobuf/Client.proto | 1 + .../hbase/regionserver/SecureBulkLoadManager.java | 43 ++++++++--- .../hbase/security/token/FsDelegationToken.java | 45 ++++++++--- .../hadoop/hbase/tool/LoadIncrementalHFiles.java | 2 +- 12 files changed, 200 insertions(+), 75 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 2186271..ede97d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -115,13 +115,14 @@ public class SecureBulkLoadClient { final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) throws IOException { - return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, new Token[]{userToken}, bulkToken, false); } /** * Securely bulk load a list of HFiles using client protocol. * + * @deprecated use {@link #secureBulkLoadHFiles(ClientService.BlockingInterface, List, byte[], boolean, Token[], String, boolean)} instead. * @param client * @param familyPaths * @param regionName @@ -132,13 +133,34 @@ public class SecureBulkLoadClient { * @return true if all are loaded * @throws IOException */ + @Deprecated public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, boolean copyFiles) throws IOException { + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, new Token[]{userToken}, bulkToken, copyFiles); + } + + /** + * Securely bulk load a list of HFiles using client protocol. + * + * @param client + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @param userTokens + * @param bulkToken + * @param copyFiles + * @return true if all are loaded + * @throws IOException + */ + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token[] userTokens, final String bulkToken, boolean copyFiles) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, - userToken, bulkToken, copyFiles); + userTokens, bulkToken, copyFiles); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 36c8fab..1b34f29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -570,6 +570,7 @@ public final class RequestConverter { /** * Create a protocol buffer bulk load request * + * @deprecated Use {@link #buildBulkLoadHFileRequest(List, byte[], boolean, Token[], String, boolean)} * @param familyPaths * @param regionName * @param assignSeqNum @@ -578,22 +579,32 @@ public final class RequestConverter { * @param copyFiles * @return a bulk load request */ + @Deprecated public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, boolean copyFiles) { + return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, new Token[]{userToken}, bulkToken, copyFiles); + } + + /** + * Create a protocol buffer bulk load request + * + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @param userTokens + * @param bulkToken + * @param copyFiles + * @return a bulk load request + */ + public static BulkLoadHFileRequest buildBulkLoadHFileRequest( + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token[] userTokens, final String bulkToken, boolean copyFiles) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); - ClientProtos.DelegationToken protoDT = null; - if (userToken != null) { - protoDT = - ClientProtos.DelegationToken.newBuilder() - .setIdentifier(UnsafeByteOperations.unsafeWrap(userToken.getIdentifier())) - .setPassword(UnsafeByteOperations.unsafeWrap(userToken.getPassword())) - .setKind(userToken.getKind().toString()) - .setService(userToken.getService().toString()).build(); - } List protoFamilyPaths = new ArrayList<>(familyPaths.size()); if (!familyPaths.isEmpty()) { @@ -612,8 +623,16 @@ public final class RequestConverter { .setRegion(region) .setAssignSeqNum(assignSeqNum) .addAllFamilyPath(protoFamilyPaths); - if (userToken != null) { - request.setFsToken(protoDT); + if (userTokens != null) { + ClientProtos.DelegationToken protoDT; + for (Token userToken: userTokens) { + protoDT = ClientProtos.DelegationToken.newBuilder().setIdentifier( + UnsafeByteOperations.unsafeWrap(userToken.getIdentifier())) + .setPassword(UnsafeByteOperations.unsafeWrap(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + request.addFsTokens(protoDT); + } } if (bulkToken != null) { request.setBulkToken(bulkToken); diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java index b21d5c3..681179c 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/Export.java @@ -26,6 +26,7 @@ import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.mapreduce.ExportUtils; import org.apache.hadoop.hbase.mapreduce.Import; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; import org.apache.hadoop.hbase.protobuf.generated.ExportProtos; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -124,7 +126,7 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces fsDelegationToken.acquireDelegationToken(fs); try { final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir, - scan, fsDelegationToken.getUserToken()); + scan, fsDelegationToken.getUserTokens()); try (Connection con = ConnectionFactory.createConnection(conf); Table table = con.getTable(tableName)) { Map result = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -213,11 +215,12 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf, final UserProvider userProvider, final Scan scan, - final Token userToken, final List opts) throws IOException { + final List> userTokens, final List opts) + throws IOException { ScanCoprocessor cp = new ScanCoprocessor(region); RegionScanner scanner = null; try (RegionOp regionOp = new RegionOp(region); - SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) { + SecureWriter out = new SecureWriter(conf, userProvider, userTokens, opts)) { scanner = cp.checkScannerOpen(scan); ImmutableBytesWritable key = new ImmutableBytesWritable(); long rowCount = 0; @@ -278,33 +281,36 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces } private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf, - Path dir, final Scan scan, final Token userToken) throws IOException { + Path dir, final Scan scan, final Token[] userTokens) throws IOException { boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false); String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE, DEFAULT_TYPE.toString()); String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName()); DelegationToken protoToken = null; - if (userToken != null) { - protoToken = DelegationToken.newBuilder() - .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) - .setPassword(ByteStringer.wrap(userToken.getPassword())) - .setKind(userToken.getKind().toString()) - .setService(userToken.getService().toString()).build(); - } - LOG.info("compressed=" + compressed - + ", compression type=" + compressionType - + ", compression codec=" + compressionCodec - + ", userToken=" + userToken); + ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder() .setScan(ProtobufUtil.toScan(scan)) .setOutputPath(dir.toString()) .setCompressed(compressed) .setCompressCodec(compressionCodec) .setCompressType(compressionType); - if (protoToken != null) { - builder.setFsToken(protoToken); + + if (userTokens != null) { + for (Token userToken : userTokens) { + protoToken = DelegationToken.newBuilder() + .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) + .setPassword(ByteStringer.wrap(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + builder.addFsTokens(protoToken); + } } + + LOG.info("compressed=" + compressed + + ", compression type=" + compressionType + + ", compression codec=" + compressionCodec + + ", userToken=" + Arrays.toString(userTokens)); return builder.build(); } @@ -337,16 +343,30 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces try { Scan scan = validateKey(region.getRegionInfo(), request); Token userToken = null; - if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) { + List> userTokens = new ArrayList<>(); + if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken() && + request.getFsTokensCount() == 0) { LOG.warn("Hadoop security is enable, but no found of user token"); } else if (userProvider.isHadoopSecurityEnabled()) { - userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), - request.getFsToken().getPassword().toByteArray(), - new Text(request.getFsToken().getKind()), - new Text(request.getFsToken().getService())); + if (request.getFsTokensCount() > 0) { + for (ClientProtos.DelegationToken delegationToken : request.getFsTokensList()) { + userToken = new Token(delegationToken.getIdentifier().toByteArray(), + delegationToken.getPassword().toByteArray(), new Text(delegationToken.getKind()), + new Text(delegationToken.getService())); + userTokens.add(userToken); + } + } + // Backward compatibility: support pre-HBASE-20900 clients + if (request.hasFsToken()) { + ClientProtos.DelegationToken delegationToken = request.getFsToken(); + userToken = new Token(delegationToken.getIdentifier().toByteArray(), + delegationToken.getPassword().toByteArray(), new Text(delegationToken.getKind()), + new Text(delegationToken.getService())); + userTokens.add(userToken); + } } ExportProtos.ExportResponse response = processData(region, conf, userProvider, - scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request)); + scan, userTokens, getWriterOptions(conf, region.getRegionInfo(), request)); done.run(response); } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); @@ -449,9 +469,9 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces private final PrivilegedWriter privilegedWriter; SecureWriter(final Configuration conf, final UserProvider userProvider, - final Token userToken, final List opts) + final List> userTokens, final List opts) throws IOException { - User user = getActiveUser(userProvider, userToken); + User user = getActiveUser(userProvider, userTokens); try { SequenceFile.Writer sequenceFileWriter = user.runAs((PrivilegedExceptionAction) () -> @@ -467,14 +487,20 @@ public class Export extends ExportProtos.ExportService implements RegionCoproces privilegedWriter.append(key, value); } - private static User getActiveUser(final UserProvider userProvider, final Token userToken) + private static User getActiveUser(final UserProvider userProvider, + final List> userTokens) throws IOException { User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent()); - if (user == null && userToken != null) { - LOG.warn("No found of user credentials, but a token was got from user request"); - } else if (user != null && userToken != null) { - user.addToken(userToken); + if (userTokens != null && !userTokens.isEmpty()) { + if (user == null) { + LOG.warn("No found of user credentials, but a token was got from user request"); + } else { + for (Token userToken: userTokens) { + user.addToken(userToken); + } + } } + return user; } diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index fb161d9..3ff79f8 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -176,9 +176,15 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg RegionSpecifier region = ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env .getRegionInfo().getRegionName()); - bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken()) + bulkLoadHFileRequest.setRegion(region) .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum()) .addAllFamilyPath(request.getFamilyPathList()); + if (request.hasFsToken()) { + bulkLoadHFileRequest.setFsToken(request.getFsToken()); + } + if (request.getFsTokensCount() > 0) { + bulkLoadHFileRequest.addAllFsTokens(request.getFsTokensList()); + } return bulkLoadHFileRequest.build(); } diff --git a/hbase-endpoint/src/main/protobuf/Export.proto b/hbase-endpoint/src/main/protobuf/Export.proto index 5e6c262..ff08a18 100644 --- a/hbase-endpoint/src/main/protobuf/Export.proto +++ b/hbase-endpoint/src/main/protobuf/Export.proto @@ -37,6 +37,7 @@ message ExportRequest { optional string compressType = 4; optional string compressCodec = 5; optional DelegationToken fsToken = 6; + repeated DelegationToken fsTokens = 7; } message ExportResponse { required uint64 rowCount = 1; diff --git a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto index d86d162..c6d5166 100644 --- a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto +++ b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto @@ -30,6 +30,7 @@ message SecureBulkLoadHFilesRequest { optional bool assign_seq_num = 2; required DelegationToken fs_token = 3; required string bulk_token = 4; + repeated DelegationToken fs_tokens = 5; } message SecureBulkLoadHFilesResponse { diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java index 0d15f93..7367cbf 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java @@ -110,7 +110,7 @@ public class SecureBulkLoadEndpointClient { } public boolean bulkLoadHFiles(final List> familyPaths, - final Token userToken, final String bulkToken, final byte[] startRow) + final List> userTokens, final String bulkToken, final byte[] startRow) throws IOException { // we never want to send a batch of HFiles to all regions, thus cannot call // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 @@ -119,15 +119,18 @@ public class SecureBulkLoadEndpointClient { SecureBulkLoadProtos.SecureBulkLoadService instance = ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - DelegationToken protoDT = - DelegationToken.newBuilder().build(); - if(userToken != null) { - protoDT = - DelegationToken.newBuilder() + DelegationToken protoDT; + + List protoDTs = new ArrayList<>(); + if(userTokens != null) { + for (Token userToken: userTokens) { + protoDT = DelegationToken.newBuilder() .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) .setPassword(ByteStringer.wrap(userToken.getPassword())) .setKind(userToken.getKind().toString()) .setService(userToken.getService().toString()).build(); + protoDTs.add(protoDT); + } } List protoFamilyPaths = @@ -140,9 +143,10 @@ public class SecureBulkLoadEndpointClient { SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() - .setFsToken(protoDT) .addAllFamilyPath(protoFamilyPaths) - .setBulkToken(bulkToken).build(); + .setBulkToken(bulkToken) + .addAllFsTokens(protoDTs) + .build(); ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 14abb08..3b94f89 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -378,6 +378,7 @@ message BulkLoadHFileRequest { optional DelegationToken fs_token = 4; optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; + repeated DelegationToken fs_tokens = 7; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 5fd20c8..df0518c 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -377,6 +377,7 @@ message BulkLoadHFileRequest { optional DelegationToken fs_token = 4; optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; + repeated DelegationToken fs_tokens = 7; message FamilyPath { required bytes family = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 566a6b6..92f253f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -212,12 +212,6 @@ public class SecureBulkLoadManager { familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath())); } - Token userToken = null; - if (userProvider.isHadoopSecurityEnabled()) { - userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() - .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text( - request.getFsToken().getService())); - } final String bulkToken = request.getBulkToken(); User user = getActiveUser(); final UserGroupInformation ugi = user.getUGI(); @@ -232,8 +226,32 @@ public class SecureBulkLoadManager { LOG.warn("unable to add token", ioe); } } - if (userToken != null) { - ugi.addToken(userToken); + + List> userTokens = new ArrayList<>(); + if (userProvider.isHadoopSecurityEnabled()) { + Token userToken; + if (request.getFsTokensCount() > 0) { + for (ClientProtos.DelegationToken delegationToken : request.getFsTokensList()) { + userToken = new Token(delegationToken.getIdentifier().toByteArray(), + delegationToken.getPassword().toByteArray(), new Text(delegationToken.getKind()), + new Text(delegationToken.getService())); + userTokens.add(userToken); + } + } + + // Backward compatibility: support pre-HBASE-20900 clients + if (request.hasFsToken()) { + ClientProtos.DelegationToken tok = request.getFsToken(); + userToken = new Token(tok.getIdentifier().toByteArray(), + tok.getPassword().toByteArray(), new Text(tok.getKind()), new Text(tok.getService())); + userTokens.add(userToken); + } + } + + if (!userTokens.isEmpty()) { + for (Token userToken: userTokens) { + ugi.addToken(userToken); + } } else if (userProvider.isHadoopSecurityEnabled()) { //we allow this to pass through in "simple" security mode //for mini cluster testing @@ -256,10 +274,11 @@ public class SecureBulkLoadManager { FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer"); targetfsDelegationToken.acquireDelegationToken(fs); - Token targetFsToken = targetfsDelegationToken.getUserToken(); - if (targetFsToken != null - && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){ - ugi.addToken(targetFsToken); + Token[] targetFsToken = targetfsDelegationToken.getUserTokens(); + if (targetFsToken != null){ + for (Token token: targetFsToken) { + ugi.addToken(token); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java index 389bcc6..8abfc72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.security.token; import java.io.IOException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -42,9 +44,10 @@ public class FsDelegationToken { private final String renewer; private boolean hasForwardedToken = false; - private Token userToken = null; + private Token[] userTokens = null; private FileSystem fs = null; + private static final Text HDFS_KIND_TEXT = new Text("HDFS_DELEGATION_TOKEN"); /* * @param renewer the account name that is allowed to renew the token. */ @@ -64,12 +67,13 @@ public class FsDelegationToken { throws IOException { if (userProvider.isHadoopSecurityEnabled()) { this.fs = fs; - userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN", + Token userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN", fs.getCanonicalServiceName()); if (userToken == null) { hasForwardedToken = false; try { - userToken = fs.getDelegationToken(renewer); + Credentials credentials = new Credentials(); + userTokens = fs.addDelegationTokens(renewer, credentials); } catch (NullPointerException npe) { // we need to handle NullPointerException in case HADOOP-10009 is missing LOG.error("Failed to get token for " + renewer); @@ -86,14 +90,16 @@ public class FsDelegationToken { */ public void releaseDelegationToken() { if (userProvider.isHadoopSecurityEnabled()) { - if (userToken != null && !hasForwardedToken) { - try { - userToken.cancel(this.fs.getConf()); - } catch (Exception e) { - LOG.warn("Failed to cancel HDFS delegation token: " + userToken, e); + if (userTokens != null && !hasForwardedToken) { + for (Token userToken: userTokens) { + try { + userToken.cancel(this.fs.getConf()); + } catch (Exception e) { + LOG.warn("Failed to cancel file system delegation token: " + userToken, e); + } } } - this.userToken = null; + this.userTokens = null; this.fs = null; } } @@ -110,10 +116,29 @@ public class FsDelegationToken { } /** + * @deprecated Use {@link #getUserTokens()} instead. * @return the delegation token acquired, or null in case it was not acquired */ + @Deprecated public Token getUserToken() { - return userToken; + if (userTokens == null) { + return null; + } + // Return one HDFS delegation token to be consistent with old behavior. + + for (Token token: userTokens) { + if (token.getKind().equals(HDFS_KIND_TEXT)) { + return token; + } + } + return null; + } + + /** + * @return the delegation token acquired, or null in case it was not acquired + */ + public Token[] getUserTokens() { + return userTokens; } public FileSystem getFileSystem() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 3320b1f..837f875 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -534,7 +534,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); + assignSeqIds, fsDelegationToken.getUserTokens(), bulkToken, copyFile); } return success ? regionName : null; } finally { -- 2.5.3