Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.12.0
    • None
    • None
    • Reviewed

    Description

      Adding workflow properties to the job configuration would enable logging and analysis of workflows in addition to individual MapReduce jobs. Suggested properties include a workflow ID, workflow name, adjacency list connecting nodes in the workflow, and the name of the current node in the workflow.

      mapreduce.workflow.id - a unique ID for the workflow, ideally prepended with the application name
      e.g. pig_<pigScriptId>

      mapreduce.workflow.name - a name for the workflow, to distinguish this workflow from other workflows and to group different runs of the same workflow
      e.g. pig command line

      mapreduce.workflow.adjacency - an adjacency list for the workflow graph, encoded as mapreduce.workflow.adjacency.<source node> = <comma-separated list of target nodes>

      mapreduce.workflow.node.name - the name of the node corresponding to this MapReduce job in the workflow adjacency list

      Attachments

        1. PIG-3048.patch
          2 kB
          Billie Rinaldi
        2. PIG-3048.patch
          2 kB
          Billie Rinaldi
        3. PIG-3048.patch
          2 kB
          Billie Rinaldi

        Activity

          Related:
          PIG-2658 (Add start time for pig script in generated Map-Reduce job conf )
          PIG-2587 (Add logical plan signature to the job conf)

          I think those two are better implementations of workflow id and workflow name (assuming multiple versions of the same workflow don't start at the same exact time).

          Adding the adjacent nodes info is interesting. I am not sure we produce something equivalent right now.. Bill Graham would know.

          dvryaboy Dmitriy V. Ryaboy added a comment - Related: PIG-2658 (Add start time for pig script in generated Map-Reduce job conf ) PIG-2587 (Add logical plan signature to the job conf) I think those two are better implementations of workflow id and workflow name (assuming multiple versions of the same workflow don't start at the same exact time). Adding the adjacent nodes info is interesting. I am not sure we produce something equivalent right now.. Bill Graham would know.

          I like the idea of adding more of this type of info, but we should make sure we define all the different namings and concepts first (existing and proposed) to make sure out terminology is clear and consistent. For example we already have these concepts:

          • job name
          • script id
          • script submit time
          • job submit time
          • logical plan signature

          So then what is a script and what is a workflow and how does versioning (i.e., logical plan signature) play into things, or does it?

          I like the idea of adjacency lists, which we don't currently produce. What I'd love to see though is the full DAG. Being able to get the full dag from any job in it would be pretty cool. For already executed jobs, their job ids could even be populated.

          Thoughts?

          billgraham William W. Graham Jr added a comment - I like the idea of adding more of this type of info, but we should make sure we define all the different namings and concepts first (existing and proposed) to make sure out terminology is clear and consistent. For example we already have these concepts: job name script id script submit time job submit time logical plan signature So then what is a script and what is a workflow and how does versioning (i.e., logical plan signature) play into things, or does it? I like the idea of adjacency lists, which we don't currently produce. What I'd love to see though is the full DAG. Being able to get the full dag from any job in it would be pretty cool. For already executed jobs, their job ids could even be populated. Thoughts?
          billie Billie Rinaldi added a comment -

          I'm thinking of a workflow as a particular DAG. Each run of a workflow has a globally unique ID, and it has a name that distinguishes that DAG from other DAGs. It sounds like the logical plan signature would be more appropriate for the workflow name, assuming we want to group together runs of the same DAG with different inputs/outputs/arguments.

          What is included in the full DAG in addition to the adjacency list?

          billie Billie Rinaldi added a comment - I'm thinking of a workflow as a particular DAG. Each run of a workflow has a globally unique ID, and it has a name that distinguishes that DAG from other DAGs. It sounds like the logical plan signature would be more appropriate for the workflow name, assuming we want to group together runs of the same DAG with different inputs/outputs/arguments. What is included in the full DAG in addition to the adjacency list?

          I could have been mis-reading your patch but I thought it was providing the adjacency list only from the job in question. The DAG would include that, plus all adjacency lists of other jobs in the script that might not be directly connected to the job in question.

          We use logical plan signature to distinguish between different version of the same workflow. Between versions the DAG could change. We've got our own custom field in the job conf to represent the workflow name, but it would be great to standardize this. So we could have something like this:

          • Workflow name: the logical name of the deployed scheduled script script (i.e., Hourly click analysis)
          • Logical plan signature (existing): a hash that represents a version of the script, without considering it's input/output
          • Script start time (existing): used with Workflow name and Logical plan signature to correlate multiple jobs into a single run of a workflow
          • Job start time (existing): used to show when different jobs start
          • Script DAG: used by tools to visualize the current workflow execution given a job. This is something we'd like to have for Ambrose (https://github.com/twitter/ambrose).

          We represent the DAG as an adjacency list keyed by the physical operator key (scope-*) [1] and then once a job starts we add the jobId to the node [2].

          1 - https://github.com/twitter/ambrose/blob/master/pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java#L91
          2 - https://github.com/twitter/ambrose/blob/master/pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java#L138

          If we had those things, would we still need a unique id for the run? It would certainly be more robust that the start time, signature, workflow name.

          Do we need node name?

          billgraham William W. Graham Jr added a comment - I could have been mis-reading your patch but I thought it was providing the adjacency list only from the job in question. The DAG would include that, plus all adjacency lists of other jobs in the script that might not be directly connected to the job in question. We use logical plan signature to distinguish between different version of the same workflow. Between versions the DAG could change. We've got our own custom field in the job conf to represent the workflow name, but it would be great to standardize this. So we could have something like this: Workflow name: the logical name of the deployed scheduled script script (i.e., Hourly click analysis) Logical plan signature (existing): a hash that represents a version of the script, without considering it's input/output Script start time (existing): used with Workflow name and Logical plan signature to correlate multiple jobs into a single run of a workflow Job start time (existing): used to show when different jobs start Script DAG: used by tools to visualize the current workflow execution given a job. This is something we'd like to have for Ambrose ( https://github.com/twitter/ambrose ). We represent the DAG as an adjacency list keyed by the physical operator key (scope-*) [1] and then once a job starts we add the jobId to the node [2] . 1 - https://github.com/twitter/ambrose/blob/master/pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java#L91 2 - https://github.com/twitter/ambrose/blob/master/pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java#L138 If we had those things, would we still need a unique id for the run? It would certainly be more robust that the start time, signature, workflow name. Do we need node name?
          billie Billie Rinaldi added a comment -

          I could have been mis-reading your patch but I thought it was providing the adjacency list only from the job in question. The DAG would include that, plus all adjacency lists of other jobs in the script that might not be directly connected to the job in question.

          Right, I remember now that operators are removed from the plan as jobs are executed. I didn't seem to have access to the entire DAG at the point where the job configuration is populated, so I figured that you could just take the full DAG from the first job, or even better, merge the partial DAGs for all the jobs (in the event that some application adds onto its DAG after jobs have begun executing). Instead, I could split the new configuration method into two parts, one that sets the adjacencies before any jobs are run, and another that sets the rest of the information. I can certainly use OperatorKey.toString for the node names.

          If we had those things, would we still need a unique id for the run?

          I'd like to have a workflow ID like pig_<scriptID> because then we don't have to know what makes a unique identifier for a particular application. I've opened a similar ticket for Hive, HIVE-3708. Do you think we should include the Pig version in the ID? I think it makes sense to make the workflow name either the script name or the logical plan signature, or perhaps a concatenation of the two. Is the script name what you meant by "the logical name of the deployed scheduled script"?

          Do we need node name?

          If you don't have a node name, how do you know which job in the DAG is running?

          billie Billie Rinaldi added a comment - I could have been mis-reading your patch but I thought it was providing the adjacency list only from the job in question. The DAG would include that, plus all adjacency lists of other jobs in the script that might not be directly connected to the job in question. Right, I remember now that operators are removed from the plan as jobs are executed. I didn't seem to have access to the entire DAG at the point where the job configuration is populated, so I figured that you could just take the full DAG from the first job, or even better, merge the partial DAGs for all the jobs (in the event that some application adds onto its DAG after jobs have begun executing). Instead, I could split the new configuration method into two parts, one that sets the adjacencies before any jobs are run, and another that sets the rest of the information. I can certainly use OperatorKey.toString for the node names. If we had those things, would we still need a unique id for the run? I'd like to have a workflow ID like pig_<scriptID> because then we don't have to know what makes a unique identifier for a particular application. I've opened a similar ticket for Hive, HIVE-3708 . Do you think we should include the Pig version in the ID? I think it makes sense to make the workflow name either the script name or the logical plan signature, or perhaps a concatenation of the two. Is the script name what you meant by "the logical name of the deployed scheduled script"? Do we need node name? If you don't have a node name, how do you know which job in the DAG is running?
          billie Billie Rinaldi added a comment -

          Updated patch based on discussion.

          billie Billie Rinaldi added a comment - Updated patch based on discussion.
          billie Billie Rinaldi added a comment -

          Updated patch for trunk.

          billie Billie Rinaldi added a comment - Updated patch for trunk.
          daijy Daniel Dai added a comment -

          The patch looks fine for me.

          Yes we can have different ways to represent track a job as Bill indicated, which might already exist in Pig. However, the key here is we also need a way to capture this information consistently across multiple projects (such as Hive). Since Hive already adopted HIVE-3708, it is meaningful to check in this patch, so upper layer application such as Ambari can make of this in a consistent manner.

          I would like to check it in if no objection.

          daijy Daniel Dai added a comment - The patch looks fine for me. Yes we can have different ways to represent track a job as Bill indicated, which might already exist in Pig. However, the key here is we also need a way to capture this information consistently across multiple projects (such as Hive). Since Hive already adopted HIVE-3708 , it is meaningful to check in this patch, so upper layer application such as Ambari can make of this in a consistent manner. I would like to check it in if no objection.

          no objections. after all, usage of the config info is purely optional.

          We've run into trouble before with information of this sort becoming very big and triggering JobConf too large errors. Might want to look at compression at some point.

          dvryaboy Dmitriy V. Ryaboy added a comment - no objections. after all, usage of the config info is purely optional. We've run into trouble before with information of this sort becoming very big and triggering JobConf too large errors. Might want to look at compression at some point.
          daijy Daniel Dai added a comment -

          Patch committed to trunk. Thanks guys!

          daijy Daniel Dai added a comment - Patch committed to trunk. Thanks guys!

          +1 to commit.

          Just one style nit re spaces:

          (getFileName() != null)?getFileName():"default"
          

          should instead be:

          (getFileName() != null) ? getFileName() : "default"
          
          billgraham William W. Graham Jr added a comment - +1 to commit. Just one style nit re spaces: (getFileName() != null)?getFileName():"default" should instead be: (getFileName() != null) ? getFileName() : "default"

          Whoops, I was a minute too late.

          billgraham William W. Graham Jr added a comment - Whoops, I was a minute too late.
          daijy Daniel Dai added a comment -

          No problem, I just committed the change you suggested. Thanks Bill!

          daijy Daniel Dai added a comment - No problem, I just committed the change you suggested. Thanks Bill!

          People

            billie Billie Rinaldi
            billie Billie Rinaldi
            Votes:
            0 Vote for this issue
            Watchers:
            Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment