diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6a13b55e69..3d0e2f9646 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -15,7 +15,6 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; @@ -23,9 +22,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -102,6 +110,7 @@ private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class); public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; + private UgiPool ugiPool; private final AMReporter amReporter; private final QueryTracker queryTracker; private final Scheduler executorService; @@ -130,6 +139,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, AtomicReference super("ContainerRunnerImpl"); Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); + this.ugiPool = new UgiPool(); this.localAddress = localAddress; this.localShufflePort = localShufflePort; this.amReporter = amReporter; @@ -269,7 +279,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId, ugiPool); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -596,4 +606,64 @@ public int getNumActive() { return executorService.getNumActiveForReporting(); } + static class UgiPool { + // Pool of UGI for a given appTokenIdentifier (AM). Expires after 10 minutes of last access + private final Cache> ugiPool = + CacheBuilder + .newBuilder().removalListener(new RemovalListener>() { + @Override + public void onRemoval( + RemovalNotification> notification) { + LOG.info("Removing " + notification.getValue() + " from pool"); + } + }).expireAfterAccess(10, TimeUnit.MINUTES).build(); + + /** + * Get UGI for a given AM and appToken. It is possible to have more than one + * UGI per AM. + * + * @param appTokenIdentifier + * @param appToken + * @return UserGroupInformation + * @throws ExecutionException + */ + public UserGroupInformation getUmbilicalUgi(String appTokenIdentifier, + Token appToken) throws ExecutionException { + BlockingQueue queue = ugiPool.get(appTokenIdentifier, + new Callable>() { + @Override + public BlockingQueue call() throws Exception { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier); + ugi.addToken(appToken); + BlockingQueue queue = new LinkedBlockingQueue<>(); + queue.add(ugi); + LOG.info("Added new ugi pool for " + appTokenIdentifier); + return queue; + } + }); + + UserGroupInformation ugi = queue.poll(); + if (ugi == null) { + ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier); + ugi.addToken(appToken); + queue.offer(ugi); + LOG.info("Added new ugi for " + appTokenIdentifier + ", pool size:" + queue.size()); + } + return ugi; + } + + /** + * Return UGI back to pool + * + * @param appTokenIdentifier AM identifier + * @param ugi + */ + public void returnUmbilicalUgi(String appTokenIdentifier, UserGroupInformation ugi) { + BlockingQueue ugiQueue = ugiPool.getIfPresent(appTokenIdentifier); + if (ugiQueue != null) { + ugiQueue.offer(ugi); + } + } + } + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 00fed15d2b..c464c2f8fa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -24,10 +24,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; @@ -70,6 +69,7 @@ private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); private final String tokenUserName, appId; + private final ContainerRunnerImpl.UgiPool ugiPool; public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, String dagName, String hiveQueryIdString, @@ -79,7 +79,7 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag String tokenAppId, final LlapNodeId amNodeId, String tokenIdentifier, Token appToken, - boolean isExternalQuery) { + boolean isExternalQuery, ContainerRunnerImpl.UgiPool ugiPool) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagIdString = dagIdString; @@ -96,6 +96,7 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag this.appTokenIdentifier = tokenIdentifier; this.appToken = appToken; this.isExternalQuery = isExternalQuery; + this.ugiPool = ugiPool; final InetSocketAddress address = NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); SecurityUtil.setTokenService(appToken, address); @@ -332,21 +333,11 @@ public String getTokenAppId() { return appId; } - - private final BlockingQueue ugiPool = new LinkedBlockingQueue<>(); - - public UserGroupInformation getUmbilicalUgi() { - - UserGroupInformation ugi; - ugi = ugiPool.poll(); - if (ugi == null) { - ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier); - ugi.addToken(appToken); - } - return ugi; + UserGroupInformation getUmbilicalUgi() throws ExecutionException { + return ugiPool.getUmbilicalUgi(appTokenIdentifier, appToken); } - public void returnUmbilicalUgi(UserGroupInformation ugi) { - ugiPool.offer(ugi); + void returnUmbilicalUgi(UserGroupInformation ugi) { + ugiPool.returnUmbilicalUgi(appTokenIdentifier, ugi); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index eae8e08540..8dff6a70fe 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -160,7 +160,8 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, SignableVertexSpec vertex, Token appToken, - String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException { + String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId, + ContainerRunnerImpl.UgiPool ugiPool) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); // Note: This is a readLock to prevent a race with queryComplete. Operations @@ -191,7 +192,7 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken, - vertex.getIsExternalSubmission()); + vertex.getIsExternalSubmission(), ugiPool); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 50dec4759e..9d45cb29d6 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -102,7 +102,7 @@ public static QueryInfo createQueryInfo() { new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap(), - new String[0], null, "fakeUser", null, nodeId, null, null, false); + new String[0], null, "fakeUser", null, nodeId, null, null, false, null); return queryInfo; }