diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index e5bd05e..b4c62d5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -194,7 +194,7 @@ public void serviceStop() { } } - public void registerTask(String amLocation, int port, String user, + public void registerTask(String amLocation, int port, String umbilicalUser, Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID attemptId) { if (LOG.isTraceEnabled()) { @@ -210,7 +210,7 @@ public void registerTask(String amLocation, int port, String user, LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { - amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, + amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); knownAppMasters.put(queryIdentifier, amNodeInfo); // Add to the queue only the first time this is registered, and on @@ -244,14 +244,14 @@ public void unregisterTask(String amLocation, int port, QueryIdentifier queryIde } } - public void taskKilled(String amLocation, int port, String user, Token jobToken, + public void taskKilled(String amLocation, int port, String umbilicalUser, Token jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { - amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + amNodeInfo = new AMNodeInfo(amNodeId, umbilicalUser, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); } } @@ -424,7 +424,7 @@ protected Void callInternal() { private static class AMNodeInfo implements Delayed { // Serves as lock for itself. private final Set tasks = new HashSet<>(); - private final String user; + private final String umbilicalUser; private final Token jobToken; private final Configuration conf; private final LlapNodeId amNodeId; @@ -438,14 +438,14 @@ protected Void callInternal() { private final AtomicBoolean isDone = new AtomicBoolean(false); - public AMNodeInfo(LlapNodeId amNodeId, String user, + public AMNodeInfo(LlapNodeId amNodeId, String umbilicalUser, Token jobToken, QueryIdentifier currentQueryIdentifier, RetryPolicy retryPolicy, long timeout, SocketFactory socketFactory, Configuration conf) { - this.user = user; + this.umbilicalUser = umbilicalUser; this.jobToken = jobToken; this.queryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; @@ -460,7 +460,7 @@ synchronized LlapTaskUmbilicalProtocol getUmbilical() throws IOException, Interr final InetSocketAddress address = NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); SecurityUtil.setTokenService(this.jobToken, address); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(umbilicalUser); ugi.addToken(jobToken); umbilical = ugi.doAs(new PrivilegedExceptionAction() { @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index ca476ec..2a69d6a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -508,10 +508,10 @@ public void queryFailed(QueryIdentifier queryIdentifier) { private class KilledTaskHandlerImpl implements KilledTaskHandler { @Override - public void taskKilled(String amLocation, int port, String user, + public void taskKilled(String amLocation, int port, String umbilicalUser, Token jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID taskAttemptId) { - amReporter.taskKilled(amLocation, port, user, jobToken, queryIdentifier, taskAttemptId); + amReporter.taskKilled(amLocation, port, umbilicalUser, jobToken, queryIdentifier, taskAttemptId); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index f24a647..18f0db9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -150,7 +150,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (amReporter != null && jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId); + vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), attemptId); } this.metrics = metrics; this.requestId = taskSpec.getTaskAttemptID().toString(); @@ -377,7 +377,7 @@ public void killTask() { */ public void reportTaskKilled() { killedTaskHandler - .taskKilled(request.getAmHost(), request.getAmPort(), vertex.getUser(), jobToken, + .taskKilled(request.getAmHost(), request.getAmPort(), vertex.getTokenIdentifier(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID()); }