Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      This is a proposal for a system specialized in running Hadoop/Pig jobs in a control dependency DAG (Direct Acyclic Graph), a Hadoop workflow application.

      Attached there is a complete specification and a high level overview presentation.


      Highlights

      A Workflow application is DAG that coordinates the following types of actions: Hadoop, Pig, Ssh, Http, Email and sub-workflows.

      Flow control operations within the workflow applications can be done using decision, fork and join nodes. Cycles in workflows are not supported.

      Actions and decisions can be parameterized with job properties, actions output (i.e. Hadoop counters, Ssh key/value pairs output) and file information (file exists, file size, etc). Formal parameters are expressed in the workflow definition as {{$

      {VAR}

      }} variables.

      A Workflow application is a ZIP file that contains the workflow definition (an XML file), all the necessary files to run all the actions: JAR files for Map/Reduce jobs, shells for streaming Map/Reduce jobs, native libraries, Pig scripts, and other resource files.

      Before running a workflow job, the corresponding workflow application must be deployed in HWS.

      Deploying workflow application and running workflow jobs can be done via command line tools, a WS API and a Java API.

      Monitoring the system and workflow jobs can be done via a web console, command line tools, a WS API and a Java API.

      When submitting a workflow job, a set of properties resolving all the formal parameters in the workflow definitions must be provided. This set of properties is a Hadoop configuration.

      Possible states for a workflow jobs are: CREATED, RUNNING, SUSPENDED, SUCCEEDED, KILLED and FAILED.

      In the case of a action failure in a workflow job, depending on the type of failure, HWS will attempt automatic retries, it will request a manual retry or it will fail the workflow job.

      HWS can make HTTP callback notifications on action start/end/failure events and workflow end/failure events.

      In the case of workflow job failure, the workflow job can be resubmitted skipping previously completed actions. Before doing a resubmission the workflow application could be updated with a patch to fix a problem in the workflow application code.


      1. Hadoop_Summit_Oozie.pdf
        1.91 MB
        Alejandro Abdelnur
      2. hws-preso-v1_0_2009FEB22.pdf
        1.88 MB
        Alejandro Abdelnur
      3. hws-spec2009MAR09.pdf
        241 kB
        Alejandro Abdelnur
      4. hws-v1_0_2009FEB22.pdf
        229 kB
        Alejandro Abdelnur
      5. oozie-0.18.3.o0.1-SNAPSHOT-distro.tar.gz
        7.65 MB
        Alejandro Abdelnur
      6. oozie-patch.txt
        4.82 MB
        Alejandro Abdelnur
      7. oozie-spec-20090521.pdf
        204 kB
        Alejandro Abdelnur
      8. oozie-src-20090605.tar.gz
        7.51 MB
        Alejandro Abdelnur
      9. packages.tar.gz
        9.66 MB
        Alejandro Abdelnur

        Activity

        Hide
        Carl Steinbach added a comment -

        Yahoo!'s Official Oozie Site: http://yahoo.github.com/oozie/

        Show
        Carl Steinbach added a comment - Yahoo!'s Official Oozie Site: http://yahoo.github.com/oozie/
        Hide
        Alejandro Abdelnur added a comment -

        To follow Oozie developments please go to

        http://github.com/tucu00/oozie1

        Show
        Alejandro Abdelnur added a comment - To follow Oozie developments please go to http://github.com/tucu00/oozie1
        Hide
        Alejandro Abdelnur added a comment -

        Apologies, I've been off for a couple of months.

        I'll update this issue shortly.

        Show
        Alejandro Abdelnur added a comment - Apologies, I've been off for a couple of months. I'll update this issue shortly.
        Hide
        Chris Douglas added a comment -

        This has remained untouched for 6 months. If the intent is still to contribute this particular patch- and iterate on it in Apache- then please excuse the dismissal and mark the issue PA again.

        Show
        Chris Douglas added a comment - This has remained untouched for 6 months. If the intent is still to contribute this particular patch- and iterate on it in Apache- then please excuse the dismissal and mark the issue PA again.
        Hide
        Alejandro Abdelnur added a comment -

        Oozie source in patch format under contrib.

        Error codes have been normalized to number ranges.

        Added build version information to client and server WS, both avail via CLI.

        Show
        Alejandro Abdelnur added a comment - Oozie source in patch format under contrib. Error codes have been normalized to number ranges. Added build version information to client and server WS, both avail via CLI.
        Hide
        Alejandro Abdelnur added a comment -

        This tar contains the hadoop/streaming/pig/sjon JARs not avail in public maven repositories.

        The tar should be expanded within the contrib/oozie directory (after applying the patch).

        Show
        Alejandro Abdelnur added a comment - This tar contains the hadoop/streaming/pig/sjon JARs not avail in public maven repositories. The tar should be expanded within the contrib/oozie directory (after applying the patch).
        Hide
        Alejandro Abdelnur added a comment -

        Oops, thanks for letting us know, we'll fix that in the next drop.

        Show
        Alejandro Abdelnur added a comment - Oops, thanks for letting us know, we'll fix that in the next drop.
        Hide
        Kevin Peterson added a comment -

        I'm building this now, the setup-maven.sh and setup-jars.sh have a shebang for /bin/sh, but they actually rely on bash functions. At least on my ubuntu 9.04, I had to modify these to /bin/bash to run them.

        Show
        Kevin Peterson added a comment - I'm building this now, the setup-maven.sh and setup-jars.sh have a shebang for /bin/sh, but they actually rely on bash functions. At least on my ubuntu 9.04, I had to modify these to /bin/bash to run them.
        Hide
        Alejandro Abdelnur added a comment -

        Hadoop Summit Oozie preso

        Show
        Alejandro Abdelnur added a comment - Hadoop Summit Oozie preso
        Hide
        Alejandro Abdelnur added a comment -

        adding Oozie to the issue summary

        Show
        Alejandro Abdelnur added a comment - adding Oozie to the issue summary
        Hide
        Alejandro Abdelnur added a comment -

        Dmitriy,

        The WARN from the log are warnings, they are not breaking things. because of some package changes from H18 to H19 we are registering some exceptions by name instead by class, thus not having to have 2 branches. What you see is a warning for the packages not available in 18.

        The failed tests it seem to be related to HDFS authorization being disabled in your Hadoop.

        The tests in error seem to be related not to have SSH passphrase-less configured.

        Please check the binary distro, the documentation explains how to setup SSH.

        Please let me know

        Show
        Alejandro Abdelnur added a comment - Dmitriy, The WARN from the log are warnings, they are not breaking things. because of some package changes from H18 to H19 we are registering some exceptions by name instead by class, thus not having to have 2 branches. What you see is a warning for the packages not available in 18. The failed tests it seem to be related to HDFS authorization being disabled in your Hadoop. The tests in error seem to be related not to have SSH passphrase-less configured. Please check the binary distro, the documentation explains how to setup SSH. Please let me know
        Hide
        Dmitriy V. Ryaboy added a comment -

        Alejandro,
        I am trying to run the tests for this on Hadoop 18.3 and several things are failing:

        1) I get repeated warnings along the lines of
        17:32:50,054 WARN PigActionExecutor:547 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] Exception [org.apache.hadoop.hdfs.server.namenode.SafeModeException] no in classpath, ActionExecutor [pig] will handled it as ERROR
        17:32:50,051 WARN FsActionExecutor:547 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] Exception [org.apache.hadoop.hdfs.protocol.QuotaExceededException] no in classpath, ActionExecutor [fs] will handled it as ERROR

        The package name was org.apache.hadoop.dfs in 18. I am using the -Dh18 flag.

        2) I also get the following Failed and Errored out tests:

        Failed tests:
        testSourceNotFoundException(org.apache.oozie.dag.action.hadoop.TestFsActionExecutor)
        testAccessDeniedException(org.apache.oozie.dag.action.hadoop.TestFsActionExecutor)
        testAuthorizationService(org.apache.oozie.service.TestAuthorizationService)
        testFsDir(org.apache.oozie.test.TestXFsTestCase)

        Tests in error:
        testJobStart(org.apache.oozie.dag.action.ssh.TestSshActionExecutor)
        testJobRecover(org.apache.oozie.dag.action.ssh.TestSshActionExecutor)
        testConnectionErrors(org.apache.oozie.dag.action.ssh.TestSshActionExecutor)

        These are along the lines of
        testAccessDeniedException(org.apache.oozie.dag.action.hadoop.TestFsActionExecutor) Time elapsed: 0.136sec <<< FAILURE!
        junit.framework.AssertionFailedError
        at junit.framework.Assert.fail(Assert.java:47)
        at junit.framework.Assert.fail(Assert.java:53)
        at org.apache.oozie.dag.action.hadoop.TestFsActionExecutor.testAccessDeniedException(TestFsActionExecutor.java:276)

        Thoughts?

        Show
        Dmitriy V. Ryaboy added a comment - Alejandro, I am trying to run the tests for this on Hadoop 18.3 and several things are failing: 1) I get repeated warnings along the lines of 17:32:50,054 WARN PigActionExecutor:547 - USER [-] GROUP [-] TOKEN [-] APP [-] JOB [-] ACTION [-] Exception [org.apache.hadoop.hdfs.server.namenode.SafeModeException] no in classpath, ActionExecutor [pig] will handled it as ERROR 17:32:50,051 WARN FsActionExecutor:547 - USER [-] GROUP [-] TOKEN [-] APP [-] JOB [-] ACTION [-] Exception [org.apache.hadoop.hdfs.protocol.QuotaExceededException] no in classpath, ActionExecutor [fs] will handled it as ERROR The package name was org.apache.hadoop.dfs in 18. I am using the -Dh18 flag. 2) I also get the following Failed and Errored out tests: Failed tests: testSourceNotFoundException(org.apache.oozie.dag.action.hadoop.TestFsActionExecutor) testAccessDeniedException(org.apache.oozie.dag.action.hadoop.TestFsActionExecutor) testAuthorizationService(org.apache.oozie.service.TestAuthorizationService) testFsDir(org.apache.oozie.test.TestXFsTestCase) Tests in error: testJobStart(org.apache.oozie.dag.action.ssh.TestSshActionExecutor) testJobRecover(org.apache.oozie.dag.action.ssh.TestSshActionExecutor) testConnectionErrors(org.apache.oozie.dag.action.ssh.TestSshActionExecutor) These are along the lines of testAccessDeniedException(org.apache.oozie.dag.action.hadoop.TestFsActionExecutor) Time elapsed: 0.136sec <<< FAILURE! junit.framework.AssertionFailedError at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.fail(Assert.java:53) at org.apache.oozie.dag.action.hadoop.TestFsActionExecutor.testAccessDeniedException(TestFsActionExecutor.java:276) Thoughts?
        Hide
        Alejandro Abdelnur added a comment -

        We've just posted an initial drop of Oozie, still work to be done, it is functional complete except for the HTTP/EMAIL action nodes that are not yet implemented, plus normalization and clean up of errors an log messages.

        We'll continue to post regular drops.

        Show
        Alejandro Abdelnur added a comment - We've just posted an initial drop of Oozie, still work to be done, it is functional complete except for the HTTP/EMAIL action nodes that are not yet implemented, plus normalization and clean up of errors an log messages. We'll continue to post regular drops.
        Hide
        Alejandro Abdelnur added a comment -

        Binary distro for Hadoop 0.18.3, with documentation and examples.

        Show
        Alejandro Abdelnur added a comment - Binary distro for Hadoop 0.18.3, with documentation and examples.
        Hide
        Alejandro Abdelnur added a comment -

        The attached tarball is Oozie source code.

        It includes Hadoop/Pig JARs to compile with 0.18.3 and 0.20.0 (0.19.1 JARs were left out as the tar was larger than 10MB).

        bootstrap instructions in readme.txt file in root directory.

        Show
        Alejandro Abdelnur added a comment - The attached tarball is Oozie source code. It includes Hadoop/Pig JARs to compile with 0.18.3 and 0.20.0 (0.19.1 JARs were left out as the tar was larger than 10MB). bootstrap instructions in readme.txt file in root directory.
        Hide
        Alejandro Abdelnur added a comment -

        An updated spec. Changes summary:

        • Fine tuning the workflow schema for simplicity and consistency.
        • Changed the deployment model from bundles to a HDFS directory.
        • Changed security model to enforce user propagation.
        • Workflow job parameters can be referred as variables, $ {myInputDir}

          , from the workflow.xml

        • Added WS API details.
        Show
        Alejandro Abdelnur added a comment - An updated spec. Changes summary: Fine tuning the workflow schema for simplicity and consistency. Changed the deployment model from bundles to a HDFS directory. Changed security model to enforce user propagation. Workflow job parameters can be referred as variables, $ {myInputDir} , from the workflow.xml Added WS API details.
        Hide
        Amr Awadallah added a comment -

        Alejandro,

        you giving a talk about Oozie during the hadoop summit?

        – amr

        Show
        Amr Awadallah added a comment - Alejandro, you giving a talk about Oozie during the hadoop summit? – amr
        Hide
        Alejandro Abdelnur added a comment -

        Jeff,

        By then of this week we'll posted an updated spec.

        We expect posting the code next week, still in the process of ironing out documentation, examples and distro.

        Thanks.

        Alejandro

        Show
        Alejandro Abdelnur added a comment - Jeff, By then of this week we'll posted an updated spec. We expect posting the code next week, still in the process of ironing out documentation, examples and distro. Thanks. Alejandro
        Hide
        Jeff Hammerbacher added a comment -

        Hey Alejandro,

        Any updates on when the code for HWS will be available to the open source community?

        Thanks,
        Jeff

        Show
        Jeff Hammerbacher added a comment - Hey Alejandro, Any updates on when the code for HWS will be available to the open source community? Thanks, Jeff
        Hide
        Alejandro Abdelnur added a comment -

        Based on the feedback and some refinements we've done we udpated the spec.

        Changelog:

        • Changed =CREATED= job state to =PREP= to have same states as Hadoop
        • Renamed 'hadoop-workflow' element to 'workflow-app'
        • Decision syntax changed to be 'switch/case' with no transition indirection
        • Action nodes common root element 'action', with the action type as sub-element (to use a single built-in XML schema)
        • Action nodes have 2 explicit transitions 'ok to' and 'error to' enforced by XML schema
        • Renamed 'fail' action element to 'kill'
        • Renamed 'hadoop' action element to 'map-reduce'
        • Renamed 'hdfs' action element to 'fs'
        • Updated all XML snippets and examples
        • Made user propagation simpler and consistent
        • Added HWS XML schema to Appendix A
        • Added workflow example to Appendix B
        Show
        Alejandro Abdelnur added a comment - Based on the feedback and some refinements we've done we udpated the spec. Changelog: Changed =CREATED= job state to =PREP= to have same states as Hadoop Renamed 'hadoop-workflow' element to 'workflow-app' Decision syntax changed to be 'switch/case' with no transition indirection Action nodes common root element 'action', with the action type as sub-element (to use a single built-in XML schema) Action nodes have 2 explicit transitions 'ok to' and 'error to' enforced by XML schema Renamed 'fail' action element to 'kill' Renamed 'hadoop' action element to 'map-reduce' Renamed 'hdfs' action element to 'fs' Updated all XML snippets and examples Made user propagation simpler and consistent Added HWS XML schema to Appendix A Added workflow example to Appendix B
        Hide
        Alejandro Abdelnur added a comment -

        Regarding the use of XSD

        We want to use XSD as it allows us to do XML schema validation at deployment time, making much slimmer all the parsing code. And the only programmatic validation we have to do at deployment time it is that the DAG does not have loose ends and does not have cycles.

        Regarding the use of multiple XSDs

        We can provide a single XSD but that will complicate how new action types can be validated at deployment time. As it would require creating a new XSD, a new XSD should have a different URI.

        That is one of the reasons we went the approach of different XSDs for actions.

        Another reason is that by using different XDSs eventually you could support a new hadoop action while still supporting the old one for all deployed applications.

        Option 1: Current option, one XSD for control nodes and one XSD per action node type.

        Option 2: Current option, one XSD for control nodes and one XSD for all the (out of the box) action node types.

        Option 3: Integrate the control nodes and all the (out of the box) action nodes into a single XSD, leaving an extension point for custom action nodes.

        Thoughts?

        Regarding input/output datasets for a high level workload scheduler

        [IMO, this is a different topic from the XSD issue]

        I understand the motivation of this, but I see this belonging to the workload scheduling level system.

        IMO, the workflow nodes should stick to use a direct mapping of Hadoop/Pig configuration knobs (config props for Hadoop, params and config props for Pig). This makes the workflow model more intuitive to the Hadoop/Pig developers.

        It should be the higher level system (in your case the workload scheduler) should map the input/output datasets to the Hadoop/Pig configuration knobs.

        Show
        Alejandro Abdelnur added a comment - Regarding the use of XSD We want to use XSD as it allows us to do XML schema validation at deployment time, making much slimmer all the parsing code. And the only programmatic validation we have to do at deployment time it is that the DAG does not have loose ends and does not have cycles. Regarding the use of multiple XSDs We can provide a single XSD but that will complicate how new action types can be validated at deployment time. As it would require creating a new XSD, a new XSD should have a different URI. That is one of the reasons we went the approach of different XSDs for actions. Another reason is that by using different XDSs eventually you could support a new hadoop action while still supporting the old one for all deployed applications. Option 1: Current option, one XSD for control nodes and one XSD per action node type. Option 2: Current option, one XSD for control nodes and one XSD for all the (out of the box) action node types. Option 3: Integrate the control nodes and all the (out of the box) action nodes into a single XSD, leaving an extension point for custom action nodes. Thoughts? Regarding input/output datasets for a high level workload scheduler [IMO, this is a different topic from the XSD issue] I understand the motivation of this, but I see this belonging to the workload scheduling level system. IMO, the workflow nodes should stick to use a direct mapping of Hadoop/Pig configuration knobs (config props for Hadoop, params and config props for Pig). This makes the workflow model more intuitive to the Hadoop/Pig developers. It should be the higher level system (in your case the workload scheduler) should map the input/output datasets to the Hadoop/Pig configuration knobs.
        Hide
        Steve Loughran added a comment -

        I'd be against embedding stuff and playing xmlns nesting tricks the way you've suggested.

        <action name="myPigjob">
          <pig xmlns="hws:pig:1">
          ...
          </pig>
        </action>
        
        1. doesn't take long before you end up in XSD-Hell
        2. stops tools being able to understand the inputs and outputs of custom actions. Which is useful if you are trying to decide where to run things, how to schedule different workflows over space and time.
        <action name="myPigjob" type="pig">
          <task name="pig" >
           <inputs>
             <dataset url="s3:/traffic/oystercard/london/2009" />
             <dataset url="s3:/traffic/anpr/london/2009/" />
             <file path="/users/steve/pig/car-vs-tube.pig" />
           </inputs>
           <option name="limit" value="40000" />
           <option name="pigfile" value="/users/steve/pig/car-vs-tube.pig" />
           <option name="startdate" value="2009-01-01" />
           <option name="enddate" value="2009-02-28" />
          <outputs>
            <dataset url="hdfs://traffic/anpr/london" />
          </outputs>
          </task>
        </action>
        

        Its uglier, but having file inputs and outputs explicit could make high level workload scheduling much easier, especially once you start deciding which rack to create VMs on

        Show
        Steve Loughran added a comment - I'd be against embedding stuff and playing xmlns nesting tricks the way you've suggested. <action name= "myPigjob" > <pig xmlns= "hws:pig:1" > ... </pig> </action> doesn't take long before you end up in XSD-Hell stops tools being able to understand the inputs and outputs of custom actions. Which is useful if you are trying to decide where to run things, how to schedule different workflows over space and time. <action name= "myPigjob" type= "pig" > <task name= "pig" > <inputs> <dataset url= "s3:/traffic/oystercard/london/2009" /> <dataset url= "s3:/traffic/anpr/london/2009/" /> <file path= "/users/steve/pig/car-vs-tube.pig" /> </inputs> <option name= "limit" value= "40000" /> <option name= "pigfile" value= "/users/steve/pig/car-vs-tube.pig" /> <option name= "startdate" value= "2009-01-01" /> <option name= "enddate" value= "2009-02-28" /> <outputs> <dataset url= "hdfs: //traffic/anpr/london" /> </outputs> </task> </action> Its uglier, but having file inputs and outputs explicit could make high level workload scheduling much easier, especially once you start deciding which rack to create VMs on
        Hide
        Alejandro Abdelnur added a comment -

        Regarding language independence for launching WF jobs,

        Yes, that is one of the motivations of the WS API.

        Regarding HWS being server-side,

        Yes, to provide scalability, failover, monitoring and manageability. We want to move away from client-side solutions.

        With many 1000s WF jobs per day, a client-side solution running a process per WF job is not a feasible option.

        Regarding leveraging existing workflow engines,

        Definitely, that has been our approach from the beginning, our current implementation uses jBPM (which uses Hibernate for persistency).

        jBPM & Hibernate are LGPL. Apache has provisions to work with LGPL, though this complicates things a bit (for testing, integration, bundling, and potentially some extra layer of indirection).

        Once there is a agreement in the Hadoop community regarding the functional spec we'd discuss about specifics of the implementation.

        We can easily replace jBPM with another implementation, and we are doing some work on this area.

        Regarding naming corrections,

        "hadoop" action to "map-reduce" action and "hdfs" to "fs", makes sense.

        Regarding HDFS generalization,

        As we use FileSystem API, it's there already, it just a name thing and per previous comment we could rename 'hdfs' to 'fs'.

        Regarding querying a WF job progress,

        Yes, you can get the current status of the WF job, stats about completed and running actions: start time, end time, resolved configuration values (with all EL expressions evaluated), current status, exit transition, link to the webconsole for the action if any (ie Hadoop job webconsole link), etc.

        Show
        Alejandro Abdelnur added a comment - Regarding language independence for launching WF jobs, Yes, that is one of the motivations of the WS API. Regarding HWS being server-side, Yes, to provide scalability, failover, monitoring and manageability. We want to move away from client-side solutions. With many 1000s WF jobs per day, a client-side solution running a process per WF job is not a feasible option. Regarding leveraging existing workflow engines, Definitely, that has been our approach from the beginning, our current implementation uses jBPM (which uses Hibernate for persistency). jBPM & Hibernate are LGPL. Apache has provisions to work with LGPL, though this complicates things a bit (for testing, integration, bundling, and potentially some extra layer of indirection). Once there is a agreement in the Hadoop community regarding the functional spec we'd discuss about specifics of the implementation. We can easily replace jBPM with another implementation, and we are doing some work on this area. Regarding naming corrections, "hadoop" action to "map-reduce" action and "hdfs" to "fs", makes sense. Regarding HDFS generalization, As we use FileSystem API, it's there already, it just a name thing and per previous comment we could rename 'hdfs' to 'fs'. Regarding querying a WF job progress, Yes, you can get the current status of the WF job, stats about completed and running actions: start time, end time, resolved configuration values (with all EL expressions evaluated), current status, exit transition, link to the webconsole for the action if any (ie Hadoop job webconsole link), etc.
        Hide
        Steve Loughran added a comment -

        On the topic of Ant, GridAnt does what Tom is thinking of, you could play with that today
        http://www.globus.org/cog/projects/gridant/

        Also, I'd expect to see Ant tasks to do the long haul submissions, to push out workflows with late binding information (username, S3 login details) sent as part of the process, but not hard coded into the XML

        Show
        Steve Loughran added a comment - On the topic of Ant, GridAnt does what Tom is thinking of, you could play with that today http://www.globus.org/cog/projects/gridant/ Also, I'd expect to see Ant tasks to do the long haul submissions, to push out workflows with late binding information (username, S3 login details) sent as part of the process, but not hard coded into the XML
        Hide
        Steve Loughran added a comment -

        Tom -there's good reasons for not using Ant as a workflow system, even though it is a great build tool with good support from IDEs and CI systems

        1. not that declarative; you need to understand every task to determine the inputs and outputs. Compare with MSBuild, which is harder to write but easier for IDEs to work with.
        2. no stable schema; very hard for other analysis tools to look a build and decide what happens
        3. no HA operation. Every task manages its state in member variables, no way to handle outages other than full restart
        4. file system is biased towards local system only. No good if you want to run the operations elsewhere in the cluster
        5. implicit bias against long-lived operations. The Eclipse team do complain if Ant leaks memory over time, but there are some assumptions that builds finish in minutes, not days -and some of Ant's datastructures are based on those assumptions
        6. no failure handling. Failures = halt the build and tell the developer they have something to fix. Workflows have different goals
        7. no formal foundation on a par with Hoare's CSP work, which was, once upon a time, what BPEL was based on.

        The Ant tasks for Hadoop aren't that complex, dont have much in the way of testing and rely on DFSClient, which abuses System.out in ways Ant wont like (Ant puts its own one up that buffers different threads up to line endings). They shouldn't be a reason to stay with Ant.

        Show
        Steve Loughran added a comment - Tom -there's good reasons for not using Ant as a workflow system, even though it is a great build tool with good support from IDEs and CI systems not that declarative; you need to understand every task to determine the inputs and outputs. Compare with MSBuild, which is harder to write but easier for IDEs to work with. no stable schema; very hard for other analysis tools to look a build and decide what happens no HA operation. Every task manages its state in member variables, no way to handle outages other than full restart file system is biased towards local system only. No good if you want to run the operations elsewhere in the cluster implicit bias against long-lived operations. The Eclipse team do complain if Ant leaks memory over time, but there are some assumptions that builds finish in minutes, not days -and some of Ant's datastructures are based on those assumptions no failure handling. Failures = halt the build and tell the developer they have something to fix. Workflows have different goals no formal foundation on a par with Hoare's CSP work, which was, once upon a time, what BPEL was based on. The Ant tasks for Hadoop aren't that complex, dont have much in the way of testing and rely on DFSClient, which abuses System.out in ways Ant wont like (Ant puts its own one up that buffers different threads up to line endings). They shouldn't be a reason to stay with Ant.
        Hide
        Tom White added a comment -

        It would be useful to clarify the goals a bit. For example, is the aim to be language independent, so one can launch workflows from any programming language? Does this need to be a server-side workflow scheduler, or would a client-side scheduler be sufficient (for the first release at least)?

        One of the stated goals is simplicity, so I wonder if there are some simpler approaches that should be considered. For example:

        • Can we use Ant? There are already some Ant tasks for interacting with Hadoop filesystems, so would adding some Ant tasks for submitting Hadoop and Pig jobs, retrieving counters, etc, provide enough control for running dependent jobs? Ant itself comes with an SSH task, HTTP GET task, and a mail task.
        • Another approach might be to look at extending JobControl. It has the notion of a Job (implemented in org.apache.hadoop.mapred.jobcontrol.Job) which is currently tied to MapReduce jobs, which could be generalized to running Pig, and perhaps other operations.
        • Are there existing workflow engines that fulfill the requirements, or are at least close enough for us to use or extend? (I notice one of the goals is to "Leverage existing expertise, concepts and components whenever possible".) Has anyone evaluated what's out there? It would be useful to do this exercise before implementing anything, I think.

        Finally, a few comments on the spec itself:

        • The "hadoop" action might be better called "map-reduce" since it runs a MapReduce job. "Hadoop" is the name of the whole project.
        • The "hdfs" action is not really confined to HDFS, but should be able to use any Hadoop filesystem, such as KFS or S3, so it would be better to call it "fs". Also, there are other places in the spec where HDFS can be generalized to be any Hadoop filesystem.
        • Is there a way to query a workflow's progress to get a percentage complete? Would the details or list operation do this?
        Show
        Tom White added a comment - It would be useful to clarify the goals a bit. For example, is the aim to be language independent, so one can launch workflows from any programming language? Does this need to be a server-side workflow scheduler, or would a client-side scheduler be sufficient (for the first release at least)? One of the stated goals is simplicity, so I wonder if there are some simpler approaches that should be considered. For example: Can we use Ant? There are already some Ant tasks for interacting with Hadoop filesystems, so would adding some Ant tasks for submitting Hadoop and Pig jobs, retrieving counters, etc, provide enough control for running dependent jobs? Ant itself comes with an SSH task, HTTP GET task, and a mail task. Another approach might be to look at extending JobControl. It has the notion of a Job (implemented in org.apache.hadoop.mapred.jobcontrol.Job) which is currently tied to MapReduce jobs, which could be generalized to running Pig, and perhaps other operations. Are there existing workflow engines that fulfill the requirements, or are at least close enough for us to use or extend? (I notice one of the goals is to "Leverage existing expertise, concepts and components whenever possible".) Has anyone evaluated what's out there? It would be useful to do this exercise before implementing anything, I think. Finally, a few comments on the spec itself: The "hadoop" action might be better called "map-reduce" since it runs a MapReduce job. "Hadoop" is the name of the whole project. The "hdfs" action is not really confined to HDFS, but should be able to use any Hadoop filesystem, such as KFS or S3, so it would be better to call it "fs". Also, there are other places in the spec where HDFS can be generalized to be any Hadoop filesystem. Is there a way to query a workflow's progress to get a percentage complete? Would the details or list operation do this?
        Hide
        Alejandro Abdelnur added a comment -

        Regarding the EL resolution,

        Resolution is well defined, how and when.

        {{$

        {something}

        }} expressions are resolved from workflow job properties and EL functions at them time that a workflow job starts a workflow action (enters the node). Values from the HWS configuration (hws-default.xml & hws-site.xml) are not used to resolve workflow job properties (this is different from Hadoop).

        The EL language HWS uses is JSP EL (and we use commons-el implementation). It allows you to support variables, functions and complex expressions. What inconsistency you (Steve) refer to? probably is a mistake on the spec.

        Regarding the choice of SQL DB for job store,

        If you need HA/failover for the DB you can get it, granted you need hardware, but it is zero to minimal effort from HWS side.

        We did not use HDFS because we need frequent workflow job context updates (ie every time that an action starts, ends).

        HWS keeps zero state in memory when workflow job is not doing a transition (this allows HWS to scale big), because of this we need index access (ie by job ID, action ID, user ID). A SQL DB gives very good read/write access times. Finally the transaction support makes very straight forward keeping things consistent in case of failure.

        HWS uses SQL standard with no extensions from any implementation, thus it can run on any SQL DB (we use HSQL for unitests and MySQL when deployed).

        Regarding allowing uses to plug in new actions without editing the codebase and schema,

        No need to modify HWS codebase for new action.

        Your suggestion about using something like <action type="email"> instead <email> makes sense. This would also remove the need of tweaking the XML-schema. Something like:

        <action name="myPigjob">
          <pig xmlns="hws:pig:1">
          ...
          </pig>
        </action>
        

        By doing this schema validation of the action type can also be performed. Plus you could support multiple versions of an action type if needed.

        Regarding testing, using HSQL

        Yes, we already do, testcases fly.

        Regarding Why XML and not JSON?

        HWS uses JSON for all WS API responses.

        HWS uses XML for the workflow job conf (leveraging Hadoop Configuration).

        HWS uses XML for the workflow definition (PDL).

        Regarding Cascading comments not strictly true

        I'm not a Cascading expert so I may have missed something, but I think I got things mostly right. Corrections please...

        Show
        Alejandro Abdelnur added a comment - Regarding the EL resolution, Resolution is well defined, how and when. {{$ {something} }} expressions are resolved from workflow job properties and EL functions at them time that a workflow job starts a workflow action (enters the node). Values from the HWS configuration (hws-default.xml & hws-site.xml) are not used to resolve workflow job properties (this is different from Hadoop). The EL language HWS uses is JSP EL (and we use commons-el implementation). It allows you to support variables, functions and complex expressions. What inconsistency you (Steve) refer to? probably is a mistake on the spec. Regarding the choice of SQL DB for job store, If you need HA/failover for the DB you can get it, granted you need hardware, but it is zero to minimal effort from HWS side. We did not use HDFS because we need frequent workflow job context updates (ie every time that an action starts, ends). HWS keeps zero state in memory when workflow job is not doing a transition (this allows HWS to scale big), because of this we need index access (ie by job ID, action ID, user ID). A SQL DB gives very good read/write access times. Finally the transaction support makes very straight forward keeping things consistent in case of failure. HWS uses SQL standard with no extensions from any implementation, thus it can run on any SQL DB (we use HSQL for unitests and MySQL when deployed). Regarding allowing uses to plug in new actions without editing the codebase and schema, No need to modify HWS codebase for new action. Your suggestion about using something like <action type="email"> instead <email> makes sense. This would also remove the need of tweaking the XML-schema. Something like: <action name= "myPigjob" > <pig xmlns= "hws:pig:1" > ... </pig> </action> By doing this schema validation of the action type can also be performed. Plus you could support multiple versions of an action type if needed. Regarding testing, using HSQL Yes, we already do, testcases fly. Regarding Why XML and not JSON? HWS uses JSON for all WS API responses. HWS uses XML for the workflow job conf (leveraging Hadoop Configuration). HWS uses XML for the workflow definition (PDL). Regarding Cascading comments not strictly true I'm not a Cascading expert so I may have missed something, but I think I got things mostly right. Corrections please...
        Hide
        Chris K Wensel added a comment -

        Sorry, I was referring to the technical and process comments.

        We do accept contributions. Independent extensions are usually a better option.

        All levels of work representation are DAGs internally, and subsequently are processes/submitted in parallel, where possible, to Hadoop.

        Cascading was designed with all the necessary public abstractions to implement HWS, in a fashion.

        etc etc.. regardless, Christophe's licensing points are valid.

        Show
        Chris K Wensel added a comment - Sorry, I was referring to the technical and process comments. We do accept contributions. Independent extensions are usually a better option. All levels of work representation are DAGs internally, and subsequently are processes/submitted in parallel, where possible, to Hadoop. Cascading was designed with all the necessary public abstractions to implement HWS, in a fashion. etc etc.. regardless, Christophe's licensing points are valid.
        Hide
        Christophe Bisciglia added a comment -

        All I was trying to say is that with current licensing, cascading cannot be hosted with / distributed by Apache/Hadoop. I can see how there is room to misinterpret other aspects of my comment.

        I am sure there would be overlap between the feature sets for HWS and Cascading. That said, I think it's important to have a workflow scheduler that can be shipped with Hadoop under the apache2 license.

        I definitely agree this is not the forum to discuss licensing issues and implications around other projects.

        Show
        Christophe Bisciglia added a comment - All I was trying to say is that with current licensing, cascading cannot be hosted with / distributed by Apache/Hadoop. I can see how there is room to misinterpret other aspects of my comment. I am sure there would be overlap between the feature sets for HWS and Cascading. That said, I think it's important to have a workflow scheduler that can be shipped with Hadoop under the apache2 license. I definitely agree this is not the forum to discuss licensing issues and implications around other projects.
        Hide
        Doug Cutting added a comment -

        > this isn't the forum to clarify

        Why not? The question is whether this is redundant with Cascading, so comparisons are certainly relevant, no?

        Show
        Doug Cutting added a comment - > this isn't the forum to clarify Why not? The question is whether this is redundant with Cascading, so comparisons are certainly relevant, no?
        Hide
        Chris K Wensel added a comment -

        Comments re Cascading aren't strictly true, but this isn't the forum to clarify.

        Show
        Chris K Wensel added a comment - Comments re Cascading aren't strictly true, but this isn't the forum to clarify.
        Hide
        Christophe Bisciglia added a comment -

        Cascading is good software, but it is GPL'd and does not currently accept outside submissions, therefore, it's incompatible with Hadoop from a licensing perspective.

        It would be great for Hadoop to have a more generic workflow scheduler that is Apache2 licensed.

        As such, we should avoid a discussion cascading vs HWS. It isn't relevant unless the cascading folks are open to licensing their code under the apache2 license.

        Show
        Christophe Bisciglia added a comment - Cascading is good software, but it is GPL'd and does not currently accept outside submissions, therefore, it's incompatible with Hadoop from a licensing perspective. It would be great for Hadoop to have a more generic workflow scheduler that is Apache2 licensed. As such, we should avoid a discussion cascading vs HWS. It isn't relevant unless the cascading folks are open to licensing their code under the apache2 license.
        Hide
        Steve Loughran added a comment -

        One more Q. Why XML and not JSON?

        Show
        Steve Loughran added a comment - One more Q. Why XML and not JSON?
        Hide
        Steve Loughran added a comment -

        -worried a bit about the logic of when $

        {something}

        refs are resolved. We already have that problem in Hadoop in that Configuration resolves $

        {property}

        refs in the JVM where the conf is, so as the configuration gets moved around, the values change, and you get no way of fixing them.

        -similarly, there some inconsistency between the language as far as URLs for callbacks are concerned. You need a consistent expression language

        -I was a bit worried about the Web Service APIs -good to see that they are JSON/RESTy. Given that we also need a long-haul API to talk to hadoop itself, it would be good if what was done here could act as a foundation for any other long-haul web APIS.

        -Why the choice of a (possibly HA) SQL db for the jobstore? It is possible to build failover with them, but you need 2-3 machines dedicated to the role. If we could push state out to HDFS/HBase then perhaps we'd eliminate a point of complexity. Alternatively: share state between peers using Zookeper?

        -I'd consider allowing users to plug in new actions without editing the codebase; it could be handled with a schema that went from <email> to <action type="email" />, with action-specific content. This would let me use jabber/twitter plugins for my job status, etc.

        -Testing? hsqldb could be used for the database.

        Show
        Steve Loughran added a comment - -worried a bit about the logic of when $ {something} refs are resolved. We already have that problem in Hadoop in that Configuration resolves $ {property} refs in the JVM where the conf is, so as the configuration gets moved around, the values change, and you get no way of fixing them. -similarly, there some inconsistency between the language as far as URLs for callbacks are concerned. You need a consistent expression language -I was a bit worried about the Web Service APIs -good to see that they are JSON/RESTy. Given that we also need a long-haul API to talk to hadoop itself, it would be good if what was done here could act as a foundation for any other long-haul web APIS. -Why the choice of a (possibly HA) SQL db for the jobstore? It is possible to build failover with them, but you need 2-3 machines dedicated to the role. If we could push state out to HDFS/HBase then perhaps we'd eliminate a point of complexity. Alternatively: share state between peers using Zookeper? -I'd consider allowing users to plug in new actions without editing the codebase; it could be handled with a schema that went from <email> to <action type="email" />, with action-specific content. This would let me use jabber/twitter plugins for my job status, etc. -Testing? hsqldb could be used for the database.
        Hide
        Alejandro Abdelnur added a comment -

        Cascading and HWS are different beasts.

        Cascading is a different way of doing what Pig does. Programming in Cascading is programming on a higher level abstraction that resolves in a series of Map/Reduce jobs.

        HWS is a (server) workflow system specialized on running Hadoop/Pig jobs wired via a PDL descriptor.

        Following a few quick highlights on how Cascading and HWS differ:

        Cascading uses a topological search model to resolve the execution path.

        HWS uses a 'DAG of processes workflow' model that allows explicitly expressing parallelism and alternate execution paths (decisions).

        Cascading runs as a client from the command line

        HWS is a server system (like Hadoop Job Tracker) to which you submit workflow jobs and later check the status.

        In HWS there are not resources held once the client submitted the workflow job, the workflow job runs in the server.

        This allows you to run several thousands of workflow jobs concurrently from a single HWS that supports system failover.

        In HWS monitoring and status tracking of jobs is done via CLIs and a web console that gathers data from HWS (like you do in Hadoop).

        Cascading primary programming model is similar to PIG but with a Java API.

        In Cascading you can still use your Hadoop jobs as a flow, as a way to integrate with existing map/reduce apps, but the real benefit of cascading is by using its API programming model.

        HWS primary programming model are Hadoop/Pig jobs connected via a workflow definition PDL like XML file.

        In cascading you need to write Java code to wire your Hadoop jobs

        In HWS you don't have to wire your Hadoop/Pig jobs in Java but in a workflow XML file in a more declarative way.

        Show
        Alejandro Abdelnur added a comment - Cascading and HWS are different beasts. Cascading is a different way of doing what Pig does. Programming in Cascading is programming on a higher level abstraction that resolves in a series of Map/Reduce jobs. HWS is a (server) workflow system specialized on running Hadoop/Pig jobs wired via a PDL descriptor. Following a few quick highlights on how Cascading and HWS differ: Cascading uses a topological search model to resolve the execution path. HWS uses a 'DAG of processes workflow' model that allows explicitly expressing parallelism and alternate execution paths (decisions). Cascading runs as a client from the command line HWS is a server system (like Hadoop Job Tracker) to which you submit workflow jobs and later check the status. In HWS there are not resources held once the client submitted the workflow job, the workflow job runs in the server. This allows you to run several thousands of workflow jobs concurrently from a single HWS that supports system failover. In HWS monitoring and status tracking of jobs is done via CLIs and a web console that gathers data from HWS (like you do in Hadoop). Cascading primary programming model is similar to PIG but with a Java API. In Cascading you can still use your Hadoop jobs as a flow, as a way to integrate with existing map/reduce apps, but the real benefit of cascading is by using its API programming model. HWS primary programming model are Hadoop/Pig jobs connected via a workflow definition PDL like XML file. In cascading you need to write Java code to wire your Hadoop jobs In HWS you don't have to wire your Hadoop/Pig jobs in Java but in a workflow XML file in a more declarative way.
        Hide
        Ted Dunning added a comment -

        Doesn't this replicate what cascading already does quite well?

        See http://www.cascading.org

        Show
        Ted Dunning added a comment - Doesn't this replicate what cascading already does quite well? See http://www.cascading.org

          People

          • Assignee:
            Alejandro Abdelnur
            Reporter:
            Alejandro Abdelnur
          • Votes:
            6 Vote for this issue
            Watchers:
            52 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development