diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index fd86fedf96..7e3fe8b789 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -19,6 +19,12 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + import java.io.IOException; import java.net.InetAddress; import java.net.Socket; @@ -26,6 +32,8 @@ import java.security.PrivilegedExceptionAction; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -537,11 +545,36 @@ public void handle(Callback[] callbacks) throws InvalidToken, final TProcessor wrapped; DelegationTokenSecretManager secretManager; boolean useProxy; + LoadingCache ugiCache; + TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager, boolean useProxy) { this.wrapped = wrapped; this.secretManager = secretManager; this.useProxy = useProxy; + + RemovalListener removalListener = new RemovalListener() { + public void onRemoval(RemovalNotification removal) { + UserGroupInformation ugi = removal.getValue(); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } + } + }; + + ugiCache = CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.DAYS) + .removalListener(removalListener) + .build( + new CacheLoader() { + @Override + public UserGroupInformation load(String user) throws Exception { + return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + } + } + ); } @@ -582,8 +615,7 @@ public boolean process(final TProtocol inProt, final TProtocol outProt) throws T UserGroupInformation clientUgi = null; try { if (useProxy) { - clientUgi = UserGroupInformation.createProxyUser( - endUser, UserGroupInformation.getLoginUser()); + clientUgi = ugiCache.get(endUser); remoteUser.set(clientUgi.getShortUserName()); LOG.debug("Set remoteUser :" + remoteUser.get()); return clientUgi.doAs(new PrivilegedExceptionAction() { @@ -604,6 +636,8 @@ public Boolean run() { LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser); return wrapped.process(inProt, outProt); } + } catch (ExecutionException ee) { + throw new RuntimeException(ee); } catch (RuntimeException rte) { if (rte.getCause() instanceof TException) { throw (TException)rte.getCause(); @@ -614,14 +648,6 @@ public Boolean run() { } catch (IOException ioe) { throw new RuntimeException(ioe); // unexpected! } - finally { - if (clientUgi != null) { - try { FileSystem.closeAllForUGI(clientUgi); } - catch(IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); - } - } - } } }