diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 088f07c..ce2f457 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -24,9 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; @@ -57,6 +58,8 @@ private final FileSystem localFs; private String[] localDirs; private final LlapNodeId amNodeId; + private final String appTokenIdentifier; + private final Token appToken; // Map of states for different vertices. private final Set knownFragments = @@ -66,14 +69,15 @@ private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); private final String tokenUserName, appId; - private final AtomicReference umbilicalUgi; public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, String dagName, String hiveQueryIdString, int dagIdentifier, String user, ConcurrentMap sourceStateMap, String[] localDirsBase, FileSystem localFs, String tokenUserName, - String tokenAppId, final LlapNodeId amNodeId) { + String tokenAppId, final LlapNodeId amNodeId, + String tokenIdentifier, + Token appToken) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagIdString = dagIdString; @@ -86,8 +90,12 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag this.localFs = localFs; this.tokenUserName = tokenUserName; this.appId = tokenAppId; - this.umbilicalUgi = new AtomicReference<>(); this.amNodeId = amNodeId; + this.appTokenIdentifier = tokenIdentifier; + this.appToken = appToken; + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); + SecurityUtil.setTokenService(appToken, address); } public QueryIdentifier getQueryIdentifier() { @@ -314,23 +322,21 @@ public String getTokenAppId() { return appId; } - public void setupUmbilicalUgi(String umbilicalUser, Token appToken, String amHost, int amPort) { - synchronized (umbilicalUgi) { - if (umbilicalUgi.get() == null) { - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(umbilicalUser); - final InetSocketAddress address = - NetUtils.createSocketAddrForHost(amHost, amPort); - SecurityUtil.setTokenService(appToken, address); - taskOwner.addToken(appToken); - umbilicalUgi.set(taskOwner); - } - } - } + + private final BlockingQueue ugiPool = new LinkedBlockingQueue<>(); public UserGroupInformation getUmbilicalUgi() { - synchronized (umbilicalUgi) { - return umbilicalUgi.get(); + + UserGroupInformation ugi; + ugi = ugiPool.poll(); + if (ugi == null) { + ugi = UserGroupInformation.createRemoteUser(appTokenIdentifier); + ugi.addToken(appToken); } + return ugi; + } + + public void returnUmbilicalUgi(UserGroupInformation ugi) { + ugiPool.offer(ugi); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 7e646c5..daeb555 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -169,13 +169,11 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, - tokenInfo.userName, tokenInfo.appId, amNodeId); + tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; } else { - // Ensure the UGI is setup once. - queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort()); isExistingQueryInfo = false; } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 1669815..7d7fd23 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -274,6 +274,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { return result; } finally { FileSystem.closeAllForUGI(fsTaskUgi); + fragmentInfo.getQueryInfo().returnUmbilicalUgi(taskOwner); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + runtimeWatch.stop().elapsedMillis()); if (LOG.isDebugEnabled()) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 6287ae8..27c426c 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; @@ -89,11 +91,12 @@ public static QueryFragmentInfo createQueryFragmentInfo( public static QueryInfo createQueryInfo() { QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1); + LlapNodeId nodeId = LlapNodeId.getInstance("localhost", 0); QueryInfo queryInfo = new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap(), - new String[0], null, "fakeUser", null, null); + new String[0], null, "fakeUser", null, nodeId, null, null); return queryInfo; }