Looks good overall. I suspect this code path will be under active development and so we can fix bugs/improve-things as we stabilize.
Now that this includes the session concept, referring to session might be better in this name
public static final String TEZ_AM_DAG_OVER_RPC_ENV = "AM_DAG_OVER_RPC";
Once we run more than 1 DAG in the AM we should de-couple AM state from DAG state.
- switch(dag.getState()) {
+ switch(currentDAG.getState()) {
case SUCCEEDED:
Should we check the current dag state. no point killing if its already completed. this is just looking ahead to multiple dags case
+ public synchronized void shutdownAM() {
+ LOG.info("Received message to shutdown AM");
+ if (currentDAG != null) {
+ + LOG.info("Sending a kill event to the current DAG"
+ + ", dagId=" + currentDAG.getID());
TEZ-71 is supposed to remove references to TEZ_STAGING_DIR (in config or literal). Lets avoid using it any further. Should be easy to create a unique staging directory.
+ Path stagingDir =
+ new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
+ TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT)
The AM may not be running at this time. YARN may have just allocated/launched the AM container. Its probably ok to skip this test for now and add it when we have "real" sessions.
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (!sentKillSession) {
+ if (appState == YarnApplicationState.RUNNING) {
+ tezClient.closeSession(tezSession);
+ sentKillSession = true;
We may choose to remove getApplicationId() from TezSession and DAGClient for now and add it later if needed. Currently there are 0 users and we may be better of hiding that link. At least mark them @Private
Cannot assume valid host port exists until the AM actually starts running (even though YARN marks state as RUNNING). DAGClient code current checks for this case. There is a YARN jira open to provide this info clearly.
+ static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
+ String amHost, int amRpcPort) throws IOException {
+ InetSocketAddress addr = new InetSocketAddress(amHost,
+ amRpcPort);
In general, didnt quite follow how getAMProxies() is working. It may throw an exception if the AM is not running but I did not see that exception being handled and this exception may always occur due to race conditions. What happens to the stored proxy when the app is retried by YARN? For now, it may be simpler to not store the proxy objects and create them as needed. We can revisit if that turns out to be an issue. closeSession() does not seem like a perf-sensitive operation as of now.
Thanks for the reviews Bikas. Committed.