Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 1362268) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision ) @@ -22,6 +22,12 @@ import java.net.Socket; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -39,6 +45,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -312,7 +319,121 @@ } } + /** + * Cache to store UGI instances. Thus, new UGI objects aren't created for each connection. + * (This prevents DistributedFileSystem.CACHE from being flooded with essentially identical instances.) + */ + private static class UGICache { + + /** + * UGI, with age-tracking. For implementing an aging-cache. + */ + private static class AgeTrackedUGI { + + public static final long AGE_THRESHOLD = 5 * 60 * 1000; // 5 Minutes, in millis. + + private final UserGroupInformation ugi; + private volatile long lastAccessTime; + + private AgeTrackedUGI(UserGroupInformation ugi) { + this.ugi = ugi; + this.lastAccessTime = System.currentTimeMillis(); + } + + static AgeTrackedUGI create(UserGroupInformation ugi) { + return new AgeTrackedUGI(ugi); + } + + UserGroupInformation get() { + lastAccessTime = System.currentTimeMillis(); + return ugi; + } + + boolean expired(long currentTimeMillis) { + return (currentTimeMillis - lastAccessTime) > AGE_THRESHOLD; + } + } + + private static ConcurrentHashMap ugiCache = new ConcurrentHashMap(); + private static List trashedUGIs = new ArrayList(); + + // Custom UGI.toString(), to drop auth-method. + private static String toString(UserGroupInformation ugi) { + return ugi.getRealUser() == null? + ugi.getUserName() : + ugi.getUserName() + " via " + toString(ugi.getRealUser()); // Ad infinitum. + } + + private static String getKey(String user, UserGroupInformation realUgi) { + return user + " via " + toString(realUgi); + } + + private static UserGroupInformation getProxyUser(String user, UserGroupInformation realUgi) { + + String key = getKey(user, realUgi); + AgeTrackedUGI ugi = ugiCache.get(key); + + if (ugi == null) { // Cache miss. + AgeTrackedUGI newUgi = AgeTrackedUGI.create(UserGroupInformation.createProxyUser(user, realUgi)); + ugi = ugiCache.putIfAbsent(key, newUgi); + if (ugi == null) // New entry. + ugi = newUgi; + } + + return ugi.get(); + + } + + private static UserGroupInformation getRemoteUser(String user) { + + AgeTrackedUGI ugi = ugiCache.get(user); + + if (ugi == null) { // Cache miss. + AgeTrackedUGI newUgi = AgeTrackedUGI.create(UserGroupInformation.createRemoteUser(user)); + ugi = ugiCache.putIfAbsent(user, newUgi); + if (ugi == null) // New entry. + ugi = newUgi; + } + + return ugi.get(); + } + + private static ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1); + + static { + + threadPool.scheduleWithFixedDelay(new Runnable() { + - @Override + @Override + public void run() { + + LOG.info("UGI Cleanup thread running."); + // Take out the trash. + for (UserGroupInformation ugi : trashedUGIs) { + try { + LOG.debug("Cleaning up file-system handles for: " + ugi); + FileSystem.closeAllForUGI(ugi); + } + catch (IOException exception) { + LOG.warn("Could not close file-system handles for : " + ugi); + } + } + trashedUGIs.clear(); + + // New sweep. + long currentTimeMillis = System.currentTimeMillis(); // Avoid too many calls to currentTimeMillis(). + for (Map.Entry entry : ugiCache.entrySet()) + if (entry.getValue().expired(currentTimeMillis)) { + // Evict. + trashedUGIs.add(ugiCache.remove(entry.getKey()).get()); + LOG.info("Evicted UGI for: " + entry.getKey()); + } + } + }, 5, 5, TimeUnit.MINUTES); + } // /static. + } + + @Override public void startDelegationTokenSecretManager(Configuration conf) throws IOException{ long secretKeyInterval = @@ -345,13 +466,12 @@ //authorized to get a delegation token. //Do all checks on short names UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); + UserGroupInformation ownerUgi = UGICache.getRemoteUser(owner); if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { //in the case of proxy users, the getCurrentUser will return the //real user (for e.g. oozie) due to the doAs that happened just before the //server started executing the method getDelegationToken in the MetaStore - ownerUgi = UserGroupInformation.createProxyUser(owner, - UserGroupInformation.getCurrentUser()); + ownerUgi = UGICache.getProxyUser(owner, UserGroupInformation.getCurrentUser()); InetAddress remoteAddr = getRemoteAddress(); ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); } @@ -511,8 +631,7 @@ Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket(); remoteAddress.set(socket.getInetAddress()); try { - UserGroupInformation clientUgi = UserGroupInformation.createProxyUser( - endUser, UserGroupInformation.getLoginUser()); + UserGroupInformation clientUgi = UGICache.getProxyUser(endUser, UserGroupInformation.getLoginUser()); return clientUgi.doAs(new PrivilegedExceptionAction() { public Boolean run() { try {