diff --git llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java index 86c5ff3..b883058 100644 --- llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java +++ llap-server/src/java/org/apache/tez/dag/app/launcher/DaemonContainerLauncher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; @@ -64,6 +65,7 @@ private final TaskAttemptListener tal; private final Map proxyMap; private final int servicePort; + private final ApplicationAttemptId appAttemptId; private final Clock clock; @@ -80,6 +82,7 @@ public DaemonContainerLauncher(AppContext appContext, Configuration conf, ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("DaemonCommunicator #%2d").build()); executor = MoreExecutors.listeningDecorator(localExecutor); + this.appAttemptId = appContext.getApplicationAttemptId(); this.context = appContext; this.tokenIdentifier = context.getApplicationID().toString(); this.tal = tal; @@ -107,7 +110,7 @@ public void handle(NMCommunicatorEvent event) { InetSocketAddress address = tal.getTaskCommunicator(launchEvent.getTaskCommId()).getAddress(); ListenableFuture future = executor.submit( new SubmitCallable(getProxy(launchEvent.getNodeId().getHost()), launchEvent, - tokenIdentifier, address.getHostName(), address.getPort())); + tokenIdentifier, appAttemptId, address.getHostName(), address.getPort())); Futures.addCallback(future, new SubmitCallback(launchEvent.getContainerId(), launchEvent.getContainer().getNodeId().getHost())); break; @@ -130,13 +133,16 @@ public void handle(NMCommunicatorEvent event) { private final String amHost; private final int amPort; private final LlapDaemonProtocolBlockingPB daemonProxy; + private final ApplicationAttemptId appAttemptId; private SubmitCallable(LlapDaemonProtocolBlockingPB daemonProxy, NMCommunicatorLaunchRequestEvent event, String tokenIdentifier, + ApplicationAttemptId appAttemptId, String amHost, int amPort) { this.event = event; this.daemonProxy = daemonProxy; this.tokenIdentifier = tokenIdentifier; + this.appAttemptId = appAttemptId; this.amHost = amHost; this.amPort = amPort; } @@ -147,9 +153,8 @@ public Void call() throws Exception { RunContainerRequestProto.Builder requestBuilder = RunContainerRequestProto.newBuilder(); // Need the taskAttemptListenerAddress requestBuilder.setAmHost(amHost).setAmPort(amPort); - requestBuilder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId()); - requestBuilder.setApplicationIdString( - event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString()); + requestBuilder.setAppAttemptNumber(appAttemptId.getAttemptId()); + requestBuilder.setApplicationIdString(appAttemptId.getApplicationId().toString()); requestBuilder.setTokenIdentifier(tokenIdentifier); requestBuilder.setContainerIdString(event.getContainer().getId().toString()); requestBuilder.setCredentialsBinary( diff --git pom.xml pom.xml index 033f01d..02da4e9 100644 --- pom.xml +++ pom.xml @@ -155,7 +155,7 @@ 1.0.1 1.7.5 4.0.4 - 0.7.0-SNAPSHOT + 0.7.0-TEZ-2003-SNAPSHOT 2.2.0 1.2.0 2.10