diff --git llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index 9f3ec38702..35ed40c240 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap; +import java.io.IOException; import java.security.PrivilegedAction; import java.util.HashSet; import java.util.Iterator; @@ -433,7 +434,8 @@ public ProtocolType call() throws Exception { } } - private ProtocolType createProxy(final LlapNodeId nodeId, Token nodeToken) { + private ProtocolType createProxy( + final LlapNodeId nodeId, Token nodeToken) throws IOException { if (nodeToken == null && token == null) { if (LOG.isDebugEnabled()) { LOG.debug("Creating a client without a token for " + nodeId); @@ -441,6 +443,12 @@ private ProtocolType createProxy(final LlapNodeId nodeId, Token nodeT return createProtocolImpl(getConfig(), nodeId.getHostname(), nodeId.getPort(), null, retryPolicy, socketFactory); } + // Either the token should be passed in here, or in ctor. + String tokenUser = this.tokenUser == null ? getTokenUser(nodeToken) : this.tokenUser; + if (tokenUser == null) { + tokenUser = UserGroupInformation.getCurrentUser().getShortUserName(); + LOG.warn("Cannot determine token user for UGI; using {}", tokenUser); + } final UserGroupInformation ugi = UserGroupInformation.createRemoteUser(tokenUser); // Clone the token as we'd need to set the service to the one we are talking to. if (nodeToken == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 96158fc9fd..eae67c5a9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -185,6 +185,8 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso throws ExecutionException, InterruptedException { assert INSTANCE == null; // We could derive the expected number of AMs to pass in. + // Note: we pass a null token here; the tokens to talk to plugin endpoints will only be + // known once the AMs register, and they are different for every AM (unlike LLAP token). LlapPluginEndpointClientImpl amComm = new LlapPluginEndpointClientImpl(conf, null, -1); QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm); return (INSTANCE = new WorkloadManager(amComm, yarnQueue, conf, qam, plan));