Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.2.0
    • None

    Description

      Allow the DAG to be submitted over RPC after the AM has started. AM runs a single DAG and exits. The DAG is run in the context of the user who submitted the AM.

      Attachments

        1. TEZ-341.1.patch
          48 kB
          Bikas Saha
        2. TEZ-341.wip.2.patch
          97 kB
          Hitesh Shah
        3. TEZ-341.3.patch
          118 kB
          Hitesh Shah
        4. TEZ-341.4.patch
          122 kB
          Hitesh Shah

        Activity

          hitesh Hitesh Shah added a comment -

          Thanks for the reviews Bikas. Committed.

          hitesh Hitesh Shah added a comment - Thanks for the reviews Bikas. Committed.
          hitesh Hitesh Shah added a comment -

          Fixed the following:

          • DAG_AM_RPC env var naming
          • changed shutdownAM to check state of AM
          • removed dependency on staging dir
          • removed am proxy cache

          Filed jira for de-coupling state of AM vs state of DAG.

          Retained closeSession test and also the code to shutdown the AM as its required if a job is not submitted. The app goes into running state only after the 1st attempt has registered so the issue only crops up is an attempt died and a new attempt has not been spawned yet. In case, there is a fall-through to using yarn kill app if the AM proxy is not available.

          hitesh Hitesh Shah added a comment - Fixed the following: DAG_AM_RPC env var naming changed shutdownAM to check state of AM removed dependency on staging dir removed am proxy cache Filed jira for de-coupling state of AM vs state of DAG. Retained closeSession test and also the code to shutdown the AM as its required if a job is not submitted. The app goes into running state only after the 1st attempt has registered so the issue only crops up is an attempt died and a new attempt has not been spawned yet. In case, there is a fall-through to using yarn kill app if the AM proxy is not available.
          bikassaha Bikas Saha added a comment -

          Looks good overall. I suspect this code path will be under active development and so we can fix bugs/improve-things as we stabilize.

          Now that this includes the session concept, referring to session might be better in this name

          public static final String TEZ_AM_DAG_OVER_RPC_ENV = "AM_DAG_OVER_RPC";
          

          Once we run more than 1 DAG in the AM we should de-couple AM state from DAG state.

          -      switch(dag.getState()) {
          +      switch(currentDAG.getState()) {
                 case SUCCEEDED:
          

          Should we check the current dag state. no point killing if its already completed. this is just looking ahead to multiple dags case

          +    public synchronized void shutdownAM() {
          +      LOG.info("Received message to shutdown AM");
          +      if (currentDAG != null) {
          +        //send a DAG_KILL message
          +        LOG.info("Sending a kill event to the current DAG"
          +            + ", dagId=" + currentDAG.getID());
          

          TEZ-71 is supposed to remove references to TEZ_STAGING_DIR (in config or literal). Lets avoid using it any further. Should be easy to create a unique staging directory.

          +    Path stagingDir =
          +        new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
          +            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT)
          

          The AM may not be running at this time. YARN may have just allocated/launched the AM container. Its probably ok to skip this test for now and add it when we have "real" sessions.

          +        YarnApplicationState appState = appReport.getYarnApplicationState();
          +        if (!sentKillSession) {
          +          if (appState == YarnApplicationState.RUNNING) {
          +            tezClient.closeSession(tezSession);
          +            sentKillSession = true;
          

          We may choose to remove getApplicationId() from TezSession and DAGClient for now and add it later if needed. Currently there are 0 users and we may be better of hiding that link. At least mark them @Private

          Cannot assume valid host port exists until the AM actually starts running (even though YARN marks state as RUNNING). DAGClient code current checks for this case. There is a YARN jira open to provide this info clearly.

          +  static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
          +      String amHost, int amRpcPort) throws IOException {
          +    InetSocketAddress addr = new InetSocketAddress(amHost,
          +        amRpcPort);
          

          In general, didnt quite follow how getAMProxies() is working. It may throw an exception if the AM is not running but I did not see that exception being handled and this exception may always occur due to race conditions. What happens to the stored proxy when the app is retried by YARN? For now, it may be simpler to not store the proxy objects and create them as needed. We can revisit if that turns out to be an issue. closeSession() does not seem like a perf-sensitive operation as of now.

          bikassaha Bikas Saha added a comment - Looks good overall. I suspect this code path will be under active development and so we can fix bugs/improve-things as we stabilize. Now that this includes the session concept, referring to session might be better in this name public static final String TEZ_AM_DAG_OVER_RPC_ENV = "AM_DAG_OVER_RPC" ; Once we run more than 1 DAG in the AM we should de-couple AM state from DAG state. - switch (dag.getState()) { + switch (currentDAG.getState()) { case SUCCEEDED: Should we check the current dag state. no point killing if its already completed. this is just looking ahead to multiple dags case + public synchronized void shutdownAM() { + LOG.info( "Received message to shutdown AM" ); + if (currentDAG != null ) { + //send a DAG_KILL message + LOG.info( "Sending a kill event to the current DAG" + + ", dagId=" + currentDAG.getID()); TEZ-71 is supposed to remove references to TEZ_STAGING_DIR (in config or literal). Lets avoid using it any further. Should be easy to create a unique staging directory. + Path stagingDir = + new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR, + TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) The AM may not be running at this time. YARN may have just allocated/launched the AM container. Its probably ok to skip this test for now and add it when we have "real" sessions. + YarnApplicationState appState = appReport.getYarnApplicationState(); + if (!sentKillSession) { + if (appState == YarnApplicationState.RUNNING) { + tezClient.closeSession(tezSession); + sentKillSession = true ; We may choose to remove getApplicationId() from TezSession and DAGClient for now and add it later if needed. Currently there are 0 users and we may be better of hiding that link. At least mark them @Private Cannot assume valid host port exists until the AM actually starts running (even though YARN marks state as RUNNING). DAGClient code current checks for this case. There is a YARN jira open to provide this info clearly. + static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf, + String amHost, int amRpcPort) throws IOException { + InetSocketAddress addr = new InetSocketAddress(amHost, + amRpcPort); In general, didnt quite follow how getAMProxies() is working. It may throw an exception if the AM is not running but I did not see that exception being handled and this exception may always occur due to race conditions. What happens to the stored proxy when the app is retried by YARN? For now, it may be simpler to not store the proxy objects and create them as needed. We can revisit if that turns out to be an issue. closeSession() does not seem like a perf-sensitive operation as of now.
          hitesh Hitesh Shah added a comment - - edited
          hitesh Hitesh Shah added a comment - - edited Gunther Hagleitner Bikas Saha Siddharth Seth Review please?
          hitesh Hitesh Shah added a comment -

          Building on top of Bikas' patch. Still need to change sleep job to use delayed submission.

          hitesh Hitesh Shah added a comment - Building on top of Bikas' patch. Still need to change sleep job to use delayed submission.
          bikassaha Bikas Saha added a comment -

          Patch with rough edges but with a test that shows it working correctly.

          bikassaha Bikas Saha added a comment - Patch with rough edges but with a test that shows it working correctly.

          People

            hitesh Hitesh Shah
            bikassaha Bikas Saha
            Votes:
            0 Vote for this issue
            Watchers:
            Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack