diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 65f7232..fcc2164 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -194,7 +194,7 @@ public void serviceStop() { } } - public void registerTask(String amLocation, int port, String user, + public void registerTask(String amLocation, int port, String umbilicalUser, Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID attemptId) { if (LOG.isTraceEnabled()) { @@ -205,7 +205,7 @@ public void registerTask(String amLocation, int port, String user, LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, + amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); knownAppMasters.put(amNodeId, amNodeInfo); // Add to the queue only the first time this is registered, and on @@ -236,14 +236,14 @@ public void unregisterTask(String amLocation, int port, TezTaskAttemptID ta) { } } - public void taskKilled(String amLocation, int port, String user, Token jobToken, + public void taskKilled(String amLocation, int port, String umbilicalUser, Token jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); } } @@ -408,7 +408,7 @@ protected Void callInternal() { private static class AMNodeInfo implements Delayed { // Serves as lock for itself. private final Set tasks = new HashSet<>(); - private final String user; + private final String umbilicalUser; private final Token jobToken; private final Configuration conf; private final LlapNodeId amNodeId; @@ -421,14 +421,14 @@ protected Void callInternal() { private long nextHeartbeatTime; - public AMNodeInfo(LlapNodeId amNodeId, String user, + public AMNodeInfo(LlapNodeId amNodeId, String umbilicalUser, Token jobToken, QueryIdentifier currentQueryIdentifier, RetryPolicy retryPolicy, long timeout, SocketFactory socketFactory, Configuration conf) { - this.user = user; + this.umbilicalUser = umbilicalUser; this.jobToken = jobToken; this.currentQueryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; @@ -443,7 +443,7 @@ synchronized LlapTaskUmbilicalProtocol getUmbilical() throws IOException, Interr final InetSocketAddress address = NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); SecurityUtil.setTokenService(this.jobToken, address); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(umbilicalUser); ugi.addToken(jobToken); umbilical = ugi.doAs(new PrivilegedExceptionAction() { @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index af8f5b0..39cef4a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -509,10 +509,10 @@ public void queryFailed(QueryIdentifier queryIdentifier) { private class KilledTaskHandlerImpl implements KilledTaskHandler { @Override - public void taskKilled(String amLocation, int port, String user, + public void taskKilled(String amLocation, int port, String umbilicalUser, Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID taskAttemptId) { - amReporter.taskKilled(amLocation, port, user, jobToken, queryIdentifier, taskAttemptId); + amReporter.taskKilled(amLocation, port, umbilicalUser, jobToken, queryIdentifier, taskAttemptId); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 25dc569..e8ac772 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -150,7 +150,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (amReporter != null && jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId); + vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId); } this.metrics = metrics; this.requestId = taskSpec.getTaskAttemptID().toString(); @@ -374,7 +374,7 @@ public void killTask() { */ public void reportTaskKilled() { killedTaskHandler - .taskKilled(request.getAmHost(), request.getAmPort(), vertex.getUser(), jobToken, + .taskKilled(request.getAmHost(), request.getAmPort(), vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID()); }