diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index a6d9d54..6c891c9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -99,6 +99,8 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag final InetSocketAddress address = NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); SecurityUtil.setTokenService(appToken, address); + // TODO Caching this and re-using across submissions breaks AM recovery, since the + // new AM may run on a different host/port. } public QueryIdentifier getQueryIdentifier() { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 7d7fd23..ceca1ad 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -41,6 +41,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -219,7 +220,18 @@ protected TaskRunner2Result callInternal() throws Exception { TezCommonUtils.convertJobTokenToBytes(jobToken)); Multimap startedInputsMap = createStartedInputMap(vertex); - final UserGroupInformation taskOwner = fragmentInfo.getQueryInfo().getUmbilicalUgi(); + final UserGroupInformation taskOwner; + if (!vertex.getIsExternalSubmission()) { + taskOwner = fragmentInfo.getQueryInfo().getUmbilicalUgi(); + } else { + // Temporary, till the external interface makes use of a single connection per + // instance. + taskOwner = UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); + taskOwner.addToken(jobToken); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); + SecurityUtil.setTokenService(jobToken, address); + } if (LOG.isDebugEnabled()) { LOG.debug("taskOwner hashCode:" + taskOwner.hashCode()); }