From 8bfe220eb76f3eca73c7d7a9b963e9756c5bee6d Mon Sep 17 00:00:00 2001 From: VicoWu <583424568@qq.com> Date: Wed, 20 May 2020 13:13:12 +0800 Subject: [PATCH 1/4] cache token for current ugi --- .../apache/hadoop/hbase/security/token/FsDelegationToken.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java index 9a58006343e..f86d60b6a10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java @@ -104,6 +104,7 @@ public void acquireDelegationToken(final String tokenKind, final FileSystem fs) hasForwardedToken = false; try { userToken = fs.getDelegationToken(renewer); + userProvider.getCurrent().addToken(userToken); } catch (NullPointerException npe) { // we need to handle NullPointerException in case HADOOP-10009 is missing LOG.error("Failed to get token for " + renewer); From 96aa214011a2e5be1c8fabed2c4cd044f7ae9792 Mon Sep 17 00:00:00 2001 From: VicoWu <583424568@qq.com> Date: Thu, 21 May 2020 16:53:52 +0800 Subject: [PATCH 2/4] add some test case for this new feature --- .../security/token/TestFsDelegationToken.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java index 81347c74115..06cfe7926fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestFsDelegationToken.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.web.WebHdfsConstants.SWEBHDFS_TOKEN_KIND; import static org.apache.hadoop.hdfs.web.WebHdfsConstants.WEBHDFS_TOKEN_KIND; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.when; import java.io.IOException; @@ -88,6 +89,10 @@ public void acquireDelegationToken_defaults_to_hdfsFileSystem() throws IOExcepti fsDelegationToken.acquireDelegationToken(fileSystem); assertEquals( fsDelegationToken.getUserToken().getKind(), HDFS_DELEGATION_KIND); + assertNotNull( + "HDFS Token should exist in cache after acquired", + userProvider.getCurrent() + .getToken(HDFS_DELEGATION_KIND.toString(), fileSystem.getCanonicalServiceName())); } @Test @@ -95,6 +100,10 @@ public void acquireDelegationToken_webhdfsFileSystem() throws IOException { fsDelegationToken.acquireDelegationToken(webHdfsFileSystem); assertEquals( fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND); + assertNotNull( + "Webhdfs token should exist in cache after acquired", + userProvider.getCurrent() + .getToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem.getCanonicalServiceName())); } @Test @@ -102,6 +111,10 @@ public void acquireDelegationToken_swebhdfsFileSystem() throws IOException { fsDelegationToken.acquireDelegationToken(swebHdfsFileSystem); assertEquals( fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND); + assertNotNull( + "Swebhdfs token should exist in cache after acquired", + userProvider.getCurrent() + .getToken(SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem.getCanonicalServiceName())); } @Test(expected = NullPointerException.class) @@ -114,6 +127,10 @@ public void acquireDelegationTokenByTokenKind_webhdfsFileSystem() throws IOExcep fsDelegationToken .acquireDelegationToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem); assertEquals(fsDelegationToken.getUserToken().getKind(), WEBHDFS_TOKEN_KIND); + assertNotNull( + "Webhdfs token should exist in cache after acquired", + userProvider.getCurrent() + .getToken(WEBHDFS_TOKEN_KIND.toString(), webHdfsFileSystem.getCanonicalServiceName())); } @Test @@ -122,5 +139,9 @@ public void acquireDelegationTokenByTokenKind_swebhdfsFileSystem() throws IOExce .acquireDelegationToken( SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem); assertEquals(fsDelegationToken.getUserToken().getKind(), SWEBHDFS_TOKEN_KIND); + assertNotNull( + "Swebhdfs token should exist in cache after acquired", + userProvider.getCurrent() + .getToken(SWEBHDFS_TOKEN_KIND.toString(), swebHdfsFileSystem.getCanonicalServiceName())); } } From 277ec84fa2a5dd556a41807a14304e8ec5124785 Mon Sep 17 00:00:00 2001 From: VicoWu <583424568@qq.com> Date: Sat, 23 May 2020 10:19:15 +0800 Subject: [PATCH 3/4] add retry logic for token expiration --- .../security/token/FsDelegationToken.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java index f86d60b6a10..d7cac239db2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java @@ -50,6 +50,7 @@ private boolean hasForwardedToken = false; private Token userToken = null; private FileSystem fs = null; + private long renewTimePoint = -1L; /* * @param renewer the account name that is allowed to renew the token. @@ -103,6 +104,7 @@ public void acquireDelegationToken(final String tokenKind, final FileSystem fs) if (userToken == null) { hasForwardedToken = false; try { + userToken = fs.getDelegationToken(renewer); userProvider.getCurrent().addToken(userToken); } catch (NullPointerException npe) { @@ -133,6 +135,25 @@ public void releaseDelegationToken() { } } + private boolean tokenValid(final String tokenKind, final FileSystem fs) + throws IOException + { + userToken = userProvider.getCurrent().getToken(tokenKind, fs.getCanonicalServiceName()); + if(userToken != null && System.currentTimeMillis() < renewTimePoint){ + return true; + } + if (userToken != null && System.currentTimeMillis() < renewTimePoint) { + try { + long expirationTime = userToken.renew(fs.getConf()); + renewTimePoint = (long)(System.currentTimeMillis() + (expirationTime - System.currentTimeMillis()) * 0.75); + } + catch (Exception e) { + return false; + } + } + return false; + } + public UserProvider getUserProvider() { return userProvider; } From 94f1546d1872da9cc5d1892b3317ac1d366334ed Mon Sep 17 00:00:00 2001 From: VicoWu <583424568@qq.com> Date: Thu, 28 May 2020 10:04:23 +0800 Subject: [PATCH 4/4] add token expiration process logic --- .../security/token/FsDelegationToken.java | 43 +++++++++---------- .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 7 ++- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java index d7cac239db2..8de7a3b197a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java @@ -50,7 +50,8 @@ private boolean hasForwardedToken = false; private Token userToken = null; private FileSystem fs = null; - private long renewTimePoint = -1L; + private long tokenExpireTime = -1L; + private long renewAheadTime = Long.MAX_VALUE; /* * @param renewer the account name that is allowed to renew the token. @@ -60,6 +61,16 @@ public FsDelegationToken(final UserProvider userProvider, final String renewer) this.renewer = renewer; } + /** + * @param renewer the account name that is allowed to renew the token. + * @param renewAheadTime how long in millis + */ + public FsDelegationToken(final UserProvider userProvider, final String renewer, long renewAheadTime) { + this.userProvider = userProvider; + this.renewer = renewer; + this.renewAheadTime = renewAheadTime; + } + /** * Acquire the delegation token for the specified filesystem. * Before requesting a new delegation token, tries to find one already available. @@ -101,13 +112,18 @@ public void acquireDelegationToken(final String tokenKind, final FileSystem fs) if (userProvider.isHadoopSecurityEnabled()) { this.fs = fs; userToken = userProvider.getCurrent().getToken(tokenKind, fs.getCanonicalServiceName()); - if (userToken == null) { + //We should acquire token when never acquired before or token is expiring or already expired + if (userToken == null || tokenExpireTime <= 0 + || System.currentTimeMillis() > tokenExpireTime - renewAheadTime) { hasForwardedToken = false; try { - userToken = fs.getDelegationToken(renewer); + //After acquired the new token,we quickly renew it to get the token expiration + //time to confirm to renew it before expiration + tokenExpireTime = userToken.renew(fs.getConf()); + LOG.debug("Acquired new token " + userToken + ". Expiration time: " + tokenExpireTime); userProvider.getCurrent().addToken(userToken); - } catch (NullPointerException npe) { + } catch (InterruptedException | NullPointerException e) { // we need to handle NullPointerException in case HADOOP-10009 is missing LOG.error("Failed to get token for " + renewer); } @@ -135,25 +151,6 @@ public void releaseDelegationToken() { } } - private boolean tokenValid(final String tokenKind, final FileSystem fs) - throws IOException - { - userToken = userProvider.getCurrent().getToken(tokenKind, fs.getCanonicalServiceName()); - if(userToken != null && System.currentTimeMillis() < renewTimePoint){ - return true; - } - if (userToken != null && System.currentTimeMillis() < renewTimePoint) { - try { - long expirationTime = userToken.renew(fs.getConf()); - renewTimePoint = (long)(System.currentTimeMillis() + (expirationTime - System.currentTimeMillis()) * 0.75); - } - catch (Exception e) { - return false; - } - } - return false; - } - public UserProvider getUserProvider() { return userProvider; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index e8b701b7ae3..3eefdc3ce1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -125,6 +125,9 @@ */ public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family"; + //HDFS DelegationToken is cached and should be renewed before token expiration + public static final String BULK_LOAD_RENEW_TOKEN_TIME_BUFFER = "hbase.bulkload.renew.token.time.buffer"; + // We use a '.' prefix which is ignored when walking directory trees // above. It is invalid family name. static final String TMP_DIR = ".tmp"; @@ -142,6 +145,7 @@ private List clusterIds = new ArrayList<>(); private boolean replicate = true; + private final long retryAheadTime; public BulkLoadHFilesTool(Configuration conf) { // make a copy, just to be sure we're not overriding someone else's config @@ -149,7 +153,8 @@ public BulkLoadHFilesTool(Configuration conf) { // disable blockcache for tool invocation, see HBASE-10500 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); userProvider = UserProvider.instantiate(conf); - fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + retryAheadTime = conf.getLong(BULK_LOAD_RENEW_TOKEN_TIME_BUFFER, 60000L); + fsDelegationToken = new FsDelegationToken(userProvider, "renewer", retryAheadTime); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); nrThreads = conf.getInt("hbase.loadincremental.threads.max",