Pig
  1. Pig
  2. PIG-3453

Implement a Storm backend to Pig

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.13.0
    • Fix Version/s: 0.13.0
    • Component/s: None
    • Labels:

      Description

      There is a lot of interest around implementing a Storm backend to Pig for streaming processing. The proposal and initial discussions can be found at https://cwiki.apache.org/confluence/display/PIG/Pig+on+Storm+Proposal

        Issue Links

          Activity

          Hide
          Pradeep Gollakota added a comment -

          A more pluggable execution engine enables a Storm backend more easily.

          Show
          Pradeep Gollakota added a comment - A more pluggable execution engine enables a Storm backend more easily.
          Hide
          Brian ONeill added a comment -

          I'm definitely +1 on this and would be glad to help out with a proof of concept (PoC). Do we have a concrete use case? Right now, we have a large historical dataset in HDFS/S3 against which we run Pig scripts. We also ingest real-time events that have the same data and structure. (using a Kafka queue + Storm). The real-time system needs to perform the same analysis captured in the Pig scripts. To accommodate both use cases, we are looking at maintaining two sets of infrastructure/code-bases. I'd love to be able to deploy the Pig scripts to Storm. I could take a stab at interpreting one of our simple Pig scripts (a FILTER + GROUP) into a Trident topology.

          (If we get a solid PoC on this, I could incorporate it as a chapter in the Storm book we are writing)

          Show
          Brian ONeill added a comment - I'm definitely +1 on this and would be glad to help out with a proof of concept (PoC). Do we have a concrete use case? Right now, we have a large historical dataset in HDFS/S3 against which we run Pig scripts. We also ingest real-time events that have the same data and structure. (using a Kafka queue + Storm). The real-time system needs to perform the same analysis captured in the Pig scripts. To accommodate both use cases, we are looking at maintaining two sets of infrastructure/code-bases. I'd love to be able to deploy the Pig scripts to Storm. I could take a stab at interpreting one of our simple Pig scripts (a FILTER + GROUP) into a Trident topology. (If we get a solid PoC on this, I could incorporate it as a chapter in the Storm book we are writing)
          Hide
          Jacob Perkins added a comment -

          I like the idea of a PoC. I hadn't really thought about the case that you'd want to run the same aggregations in real time as you would against your warehouse. It sure makes a lot of sense. Mostly I was thinking of language reuse; complex stream aggregations using the same familiar data flow syntax of Pig.

          More examples of the translation from Pig scripts to trident topologies would be helpful. It'd be motivation enough to see the amount of boilerplate, etc, required to write a trident topology versus a pig script side by side. I've been working from a word count since it's sort of the canonical example.

          I'm currently focusing on mapping Pig's LogicalPlan to a TridentTopology. Other really helpful things (that I'm less likely to make fast progress on) include defining a new operator for spouts, called 'tap' and some planning/discussion for a new operator for persisting state.

          Pradeep Gollakota - Anything else? It'd be nice to get a sense of who's working/willing to work on this and where we're at now.

          Show
          Jacob Perkins added a comment - I like the idea of a PoC. I hadn't really thought about the case that you'd want to run the same aggregations in real time as you would against your warehouse. It sure makes a lot of sense. Mostly I was thinking of language reuse; complex stream aggregations using the same familiar data flow syntax of Pig. More examples of the translation from Pig scripts to trident topologies would be helpful. It'd be motivation enough to see the amount of boilerplate, etc, required to write a trident topology versus a pig script side by side. I've been working from a word count since it's sort of the canonical example. I'm currently focusing on mapping Pig's LogicalPlan to a TridentTopology. Other really helpful things (that I'm less likely to make fast progress on) include defining a new operator for spouts, called 'tap' and some planning/discussion for a new operator for persisting state. Pradeep Gollakota - Anything else? It'd be nice to get a sense of who's working/willing to work on this and where we're at now.
          Hide
          Pradeep Gollakota added a comment -

          I personally don't have a concrete use case for this yet. In terms of using a system that can work both in warehousing and in real-time, I have been looking at Summingbird (recently opensourced). I think the word count example is a good place to start as it's the canonical example. However, I'd like to have a more complicated example as well, so I'm writing a TF-IDF implementation in Pig and in Trident. Perhaps, this can be step 2 PoC after word count. I'd also like to cut out some of the more complex operations like nested foreach statements etc in the initial PoC. I'm not sure yet how we'd solve them.

          Jacob Perkins I started a new job last week and I'm not sure how this task would fit into the road map of my new company yet. I'd love to work on this, if I have time. You're more than welcome to work on this as well. Thanks for all your great comments, input and enthusiasm.

          Show
          Pradeep Gollakota added a comment - I personally don't have a concrete use case for this yet. In terms of using a system that can work both in warehousing and in real-time, I have been looking at Summingbird (recently opensourced). I think the word count example is a good place to start as it's the canonical example. However, I'd like to have a more complicated example as well, so I'm writing a TF-IDF implementation in Pig and in Trident. Perhaps, this can be step 2 PoC after word count. I'd also like to cut out some of the more complex operations like nested foreach statements etc in the initial PoC. I'm not sure yet how we'd solve them. Jacob Perkins I started a new job last week and I'm not sure how this task would fit into the road map of my new company yet. I'd love to work on this, if I have time. You're more than welcome to work on this as well. Thanks for all your great comments, input and enthusiasm.
          Hide
          Brian ONeill added a comment -

          I've been looking at SummingBird as well, and have had success connecting Druid w/ Storm. In our specific case, we have data scientists running Pig scripts off-line/in batch that I would love to straight-up deploy into our real-time data ingestion system (on top of Storm).

          I'm going to continue w/ the PoC to see if I can convert one of those Pig scripts into a Storm topology. I have the Pig script running on a yarn cluster. I'm going to try out storm-yarn, then deploy the corresponding topology and do an unofficial "race" between hdfs+pig vs. kafka+storm. I'll let you know how I fare.

          Show
          Brian ONeill added a comment - I've been looking at SummingBird as well, and have had success connecting Druid w/ Storm. In our specific case, we have data scientists running Pig scripts off-line/in batch that I would love to straight-up deploy into our real-time data ingestion system (on top of Storm). I'm going to continue w/ the PoC to see if I can convert one of those Pig scripts into a Storm topology. I have the Pig script running on a yarn cluster. I'm going to try out storm-yarn, then deploy the corresponding topology and do an unofficial "race" between hdfs+pig vs. kafka+storm. I'll let you know how I fare.
          Hide
          Brian ONeill added a comment -

          First question, for DISTINCT within Storm, do you believe we should have a sliding time window within which we perform the distinct? There is mention of the fact that it will be stateful (since we need to keep a set in memory with which to de-dupe). Do we intend to leverage the concept of Trident State for this? (which may make sense, implement State then on each commit/flush perform the de-duping)

          thoughts?

          Show
          Brian ONeill added a comment - First question, for DISTINCT within Storm, do you believe we should have a sliding time window within which we perform the distinct? There is mention of the fact that it will be stateful (since we need to keep a set in memory with which to de-dupe). Do we intend to leverage the concept of Trident State for this? (which may make sense, implement State then on each commit/flush perform the de-duping) thoughts?
          Hide
          Brian ONeill added a comment -

          Also, we could perform DISTINCT using a backend storage mechanism (like Cassandra), where we first check storage to see if the tuple exists, if it does not, we emit. If we first route all the same tuples to a single bolt, then check from there that may work (eliminating the potential for two bolts to check for existence at the same time). Using backend storage would allow someone to perform a true DISTINCT operation.

          Show
          Brian ONeill added a comment - Also, we could perform DISTINCT using a backend storage mechanism (like Cassandra), where we first check storage to see if the tuple exists, if it does not, we emit. If we first route all the same tuples to a single bolt, then check from there that may work (eliminating the potential for two bolts to check for existence at the same time). Using backend storage would allow someone to perform a true DISTINCT operation.
          Hide
          Jacob Perkins added a comment -

          Brian ONeill, I haven't thought too hard about distinct yet myself. Since I'm really only thinking about Trident and not storm in general, doing a distinct strictly within a batch is one straightforward option. Unfortunately, from a user standpoint, I think this would be (a) minimally useful and (b) confusing. Instead we could implement something like an approximate distinct using an LRU cache? Maybe even go so far as to implement a SQF (which I haven't read in its entirety yet): http://www.vldb.org/pvldb/vol6/p589-dutta.pdf?

          Also, what about order by? In what sense is an unbounded stream ordered?

          I absolutely do not want to tie the storm/trident execution engine to an external data store such as cassandra. Pig is supposed to be backend agnostic. Maybe the default tap and sink can be Kafka (tap) and Cassandra (sink). Finally, it should be possible to run a pig script in storm local mode.

          And Pradeep Gollakota I'm actually well on the way to having nested foreach working. They way I'm working it now is each LogicalExpressionPlan becomes its own Trident BaseFunction. Actually works quite nicely for now. I haven't gotten to aggregates yet. What I probably won't implement for the POC is the tap and sink.

          Show
          Jacob Perkins added a comment - Brian ONeill , I haven't thought too hard about distinct yet myself. Since I'm really only thinking about Trident and not storm in general, doing a distinct strictly within a batch is one straightforward option. Unfortunately, from a user standpoint, I think this would be (a) minimally useful and (b) confusing. Instead we could implement something like an approximate distinct using an LRU cache? Maybe even go so far as to implement a SQF (which I haven't read in its entirety yet): http://www.vldb.org/pvldb/vol6/p589-dutta.pdf? Also, what about order by? In what sense is an unbounded stream ordered? I absolutely do not want to tie the storm/trident execution engine to an external data store such as cassandra. Pig is supposed to be backend agnostic. Maybe the default tap and sink can be Kafka (tap) and Cassandra (sink). Finally, it should be possible to run a pig script in storm local mode. And Pradeep Gollakota I'm actually well on the way to having nested foreach working. They way I'm working it now is each LogicalExpressionPlan becomes its own Trident BaseFunction. Actually works quite nicely for now. I haven't gotten to aggregates yet. What I probably won't implement for the POC is the tap and sink.
          Hide
          Brian ONeill added a comment -

          Jacob Perkins Good points/suggestions. I'll have a look at bot LRU and SQF.

          RE: Cassandra
          Sorry, I didn't mean to imply we would create a hard dependency. I meant we could leverage the Trident State abstraction. (My team happens to own the storm-cassandra Trident State implementation (https://github.com/hmsonline/storm-cassandra)) We would query the State to see if the tuple was processed. You could just as easily plug in any persistence mechanism. (e.g. https://github.com/nathanmarz/trident-memcached)

          Show
          Brian ONeill added a comment - Jacob Perkins Good points/suggestions. I'll have a look at bot LRU and SQF. RE: Cassandra Sorry, I didn't mean to imply we would create a hard dependency. I meant we could leverage the Trident State abstraction. (My team happens to own the storm-cassandra Trident State implementation ( https://github.com/hmsonline/storm-cassandra )) We would query the State to see if the tuple was processed. You could just as easily plug in any persistence mechanism. (e.g. https://github.com/nathanmarz/trident-memcached )
          Hide
          Gianmarco De Francisci Morales added a comment -

          Would it make sense to settle for approximated answers and use, e.g., bloom filters to implement distinct?

          Show
          Gianmarco De Francisci Morales added a comment - Would it make sense to settle for approximated answers and use, e.g., bloom filters to implement distinct?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Gianmarco De Francisci Morales: may I suggest https://github.com/twitter/algebird for this and many other approximate counting use cases? Already in use by scalding, summingbird, and spark.

          Show
          Dmitriy V. Ryaboy added a comment - Gianmarco De Francisci Morales : may I suggest https://github.com/twitter/algebird for this and many other approximate counting use cases? Already in use by scalding, summingbird, and spark.
          Hide
          Jacob Perkins added a comment -

          Whew. Here's a patch that demonstrates running, e2e, a word count. It's quite hefty so here's some high level points:

          • Implemented two new operators 'tap' and 'sink' with corresponding logical operators LOTap and LOSink and interfaces SinkFunc and TapFunc. I did the best I could to keep them general enough to work beyond the scope of simply storm. It may make sense to split just this part out into it's own jira&patch.
          • Implemented LocalFileTap and LocalFileSink (which really shouldn't be used for more than simple testing) to demonstrate the TapFunc and SinkFunc.
          • LogToTopologyTranslationVisitor - Much like LogToPhyTranslationVisitor for the physical plan, it walks the logical plan and creates a TridentTopology.
          • LOForEach - I more or less copied exactly what's being done in the LogToPhyTranslationVisitor. Since POForEach is serializable, rather than parsing the logical expression plans myself I simple create the POForEach and wrap it with a storm trident BaseFunction. It seemed a reasonably pragmatic approach for now.
          • LOCogroup - I took a similar approach to LoForEach except, since POPackage is tied so closely with Hadoop Writables I implemented something similar to what POPackage is doing with StreamPackageFunction
          • TridentExecutionEngine - This is probably the hackiest part. I'm not sure what the best way to create a stats object for this is. The topology runs continuously, it doesn't 'succeed'. I don't want to fake POStores.
          • Building and classpath. I did the best I could to not have a dependency nightmare scenario. After applying the patch to trunk it should build fine. To run you'll want zookeeper-3.3.3.jar (no other version works) and storm-core-0.9.0-rc2.jar in your class path.
          • test script:
          wordcount.pig
          set storm.executionengine.stream.batch.size 10000
          
          data = tap '$sometext' using org.apache.pig.backend.storm.tap.LocalFileTap('line') as (line:chararray);
          
          tokens = foreach data generate flatten(TOKENIZE(line)) as (token:chararray);
          
          counts = foreach (group tokens by token) generate
                           group as token,
                           COUNT(tokens) as num;
          
          sink counts into '$output' using org.apache.pig.backend.storm.sink.LocalFileSink('token');
          

          I'm sure there's more details than this. Again it's a large patch and, rather than continuing to polish it, I think it's time for feedback.

          Show
          Jacob Perkins added a comment - Whew. Here's a patch that demonstrates running, e2e, a word count. It's quite hefty so here's some high level points: Implemented two new operators 'tap' and 'sink' with corresponding logical operators LOTap and LOSink and interfaces SinkFunc and TapFunc. I did the best I could to keep them general enough to work beyond the scope of simply storm. It may make sense to split just this part out into it's own jira&patch. Implemented LocalFileTap and LocalFileSink (which really shouldn't be used for more than simple testing) to demonstrate the TapFunc and SinkFunc. LogToTopologyTranslationVisitor - Much like LogToPhyTranslationVisitor for the physical plan, it walks the logical plan and creates a TridentTopology. LOForEach - I more or less copied exactly what's being done in the LogToPhyTranslationVisitor. Since POForEach is serializable, rather than parsing the logical expression plans myself I simple create the POForEach and wrap it with a storm trident BaseFunction. It seemed a reasonably pragmatic approach for now. LOCogroup - I took a similar approach to LoForEach except, since POPackage is tied so closely with Hadoop Writables I implemented something similar to what POPackage is doing with StreamPackageFunction TridentExecutionEngine - This is probably the hackiest part. I'm not sure what the best way to create a stats object for this is. The topology runs continuously, it doesn't 'succeed'. I don't want to fake POStores. Building and classpath. I did the best I could to not have a dependency nightmare scenario. After applying the patch to trunk it should build fine. To run you'll want zookeeper-3.3.3.jar (no other version works) and storm-core-0.9.0-rc2.jar in your class path. test script: wordcount.pig set storm.executionengine.stream.batch.size 10000 data = tap '$sometext' using org.apache.pig.backend.storm.tap.LocalFileTap('line') as (line:chararray); tokens = foreach data generate flatten(TOKENIZE(line)) as (token:chararray); counts = foreach (group tokens by token) generate group as token, COUNT(tokens) as num; sink counts into '$output' using org.apache.pig.backend.storm.sink.LocalFileSink('token'); I'm sure there's more details than this. Again it's a large patch and, rather than continuing to polish it, I think it's time for feedback.
          Hide
          Cheolsoo Park added a comment -

          Jacob Perkins, this is exciting! I am wondering whether we should create a branch for storm backend like we have the tez branch. Since the backend interfaces including ExecutionEngine, Launcher, and PigStats are evolving now, it will be probably easier for you to maintain your work in a branch. Feel free to send an email on the dev mailing list. I am happy to help you create a branch and commit your work.

          Show
          Cheolsoo Park added a comment - Jacob Perkins , this is exciting! I am wondering whether we should create a branch for storm backend like we have the tez branch. Since the backend interfaces including ExecutionEngine, Launcher, and PigStats are evolving now, it will be probably easier for you to maintain your work in a branch. Feel free to send an email on the dev mailing list. I am happy to help you create a branch and commit your work.
          Hide
          Jacob Perkins added a comment -

          Cheolsoo Park I've got it on a separate branch in my github fork of apache pig (http://github.com/thedatachef/pig/tree/storm-integration) I just wasn't sure what the best way to say "hey, here's a storm execution engine" was other than a patch Can you direct me to the dev mailing list? Also, and maybe this is a question for the dev mailing list, but this is the first apache project I've contributed to. I'm not sure how closely it's integrated with git/github other than as a convenient mirror. If you create a branch called storm under apache/pig what's the best way for me to push changes to it? A pull request or is there another preferred method?

          Show
          Jacob Perkins added a comment - Cheolsoo Park I've got it on a separate branch in my github fork of apache pig ( http://github.com/thedatachef/pig/tree/storm-integration ) I just wasn't sure what the best way to say "hey, here's a storm execution engine" was other than a patch Can you direct me to the dev mailing list? Also, and maybe this is a question for the dev mailing list, but this is the first apache project I've contributed to. I'm not sure how closely it's integrated with git/github other than as a convenient mirror. If you create a branch called storm under apache/pig what's the best way for me to push changes to it? A pull request or is there another preferred method?
          Hide
          Cheolsoo Park added a comment -

          Usually, we create a feature branch for a big feature and merge it to trunk after fully developed/tested. Although it's totally possible to develop it in your personal repo and post a giant patch at one shot, the bigger the patch is, the longer it takes to be reviewed. So I recommend to create subtasks and incrementally commit small patches. To do that, you will need a svn branch because you can't resolve jiras w/o committing patches.

          The Pig git repo is a read-only mirror of svn repo. So unfortunately, patches need to be posted in jiras to get committed. Since you don't have commit access to svn repo, it will be helpful to have at least one committer in the loop. Does this make sense?

          Show
          Cheolsoo Park added a comment - Usually, we create a feature branch for a big feature and merge it to trunk after fully developed/tested. Although it's totally possible to develop it in your personal repo and post a giant patch at one shot, the bigger the patch is, the longer it takes to be reviewed. So I recommend to create subtasks and incrementally commit small patches. To do that, you will need a svn branch because you can't resolve jiras w/o committing patches. The Pig git repo is a read-only mirror of svn repo. So unfortunately, patches need to be posted in jiras to get committed. Since you don't have commit access to svn repo, it will be helpful to have at least one committer in the loop. Does this make sense?
          Hide
          Jacob Perkins added a comment -

          Cheolsoo Park Yes. That makes a lot of sense. So, if I understand correctly, you'll make a feature branch. Then I can just work off that feature branch. I'll create a sub task called something like 'word count' or proof-of-concept or some such, submit this first patch (against the feature branch, not trunk) for it, and we'll go from there?

          Show
          Jacob Perkins added a comment - Cheolsoo Park Yes. That makes a lot of sense. So, if I understand correctly, you'll make a feature branch. Then I can just work off that feature branch. I'll create a sub task called something like 'word count' or proof-of-concept or some such, submit this first patch (against the feature branch, not trunk) for it, and we'll go from there?
          Hide
          Cheolsoo Park added a comment -

          Yes, that's correct. I can create a branch for you. Let me do it perhaps tomorrow.

          If anyone has objections, please chime in.

          Show
          Cheolsoo Park added a comment - Yes, that's correct. I can create a branch for you. Let me do it perhaps tomorrow. If anyone has objections, please chime in.
          Hide
          Dmitriy V. Ryaboy added a comment -

          I don't see why Jacob can't keep working in a github branch... easier to look at what's changing, and he can keep merging the (read-only) git mirror from apache to keep up with changes.

          Jacob I see you are using Trident. Have you looked at your throughput numbers, vs going directly to storm?

          Show
          Dmitriy V. Ryaboy added a comment - I don't see why Jacob can't keep working in a github branch... easier to look at what's changing, and he can keep merging the (read-only) git mirror from apache to keep up with changes. Jacob I see you are using Trident. Have you looked at your throughput numbers, vs going directly to storm?
          Hide
          Cheolsoo Park added a comment -

          Dmitriy V. Ryaboy, I have no problem with that. Even we should consider migrating Pig to git.

          But if Jacob wants to merge it into trunk at some point, and more contributors want to collaborate, having an official branch in Apache is better than keeping it in his personal repo. Do you have any problem with creating a branch for Storm backend?

          Show
          Cheolsoo Park added a comment - Dmitriy V. Ryaboy , I have no problem with that. Even we should consider migrating Pig to git. But if Jacob wants to merge it into trunk at some point, and more contributors want to collaborate, having an official branch in Apache is better than keeping it in his personal repo. Do you have any problem with creating a branch for Storm backend?
          Hide
          Jacob Perkins added a comment -

          Dmitriy V. Ryaboy You're right, I'd honestly prefer to keep working on the git branch since I'm more comfortable/familiar with the workflow. I've been merging changes from apache trunk as I've been going already. It's no big deal to make patches.

          I went with Trident originally because it's a very simple abstraction that's fairly straightforward to map to pig constructs. I'm not opposed to going directly to storm if that makes sense from a performance perspective but I imagine it'd be a quite a bit more complicated and involve more code. Worth looking further into I suppose. And no, I have not looked at throughput numbers yet. Any suggestions for the best way to do that, eg. comparing a trident topology to a lean storm topology?

          Show
          Jacob Perkins added a comment - Dmitriy V. Ryaboy You're right, I'd honestly prefer to keep working on the git branch since I'm more comfortable/familiar with the workflow. I've been merging changes from apache trunk as I've been going already. It's no big deal to make patches. I went with Trident originally because it's a very simple abstraction that's fairly straightforward to map to pig constructs. I'm not opposed to going directly to storm if that makes sense from a performance perspective but I imagine it'd be a quite a bit more complicated and involve more code. Worth looking further into I suppose. And no, I have not looked at throughput numbers yet. Any suggestions for the best way to do that, eg. comparing a trident topology to a lean storm topology?
          Hide
          Mridul Jain added a comment -

          We have been planning to do that comparison, but it would help if someone has already done any benchmarking.
          Just last week I was trying to map our problem statement already expressed in PIG, to trident. We are trying to do two things at a high level: express Storm topology via PIG...secondly, have a mixed mode where we can express online(storm)-offline(map-reduce) jobs via the same PIG scripts and pass data between the two...as many of our systems require offline processing of data (long running ML jobs) in addition to fastpath processing via Storm, for the incoming data. We do have some literature/arch worked-out for the same, which also tries to maintain semantic as well as syntactic compatibility with existing PIG.
          For all the above, we plan to convert from PIG to Trident directly due to the following reasons:

          • Trident supports batch processing (which was available via transactional topologies in older version of Storm; but now has been subsumed via Trident) which should provide high throughput for the whole pipeline than tuple-by-tuple processing in vanilla Storm. Also certain ops like merge, join etc seem to match batch semantics, in general, naturally with Trident.
          • Trident semantics seems to fit in very well with traditional PIG UDFs....as someone pointed out above...it can be easily converted from/to existing PIG UDFs.
          • Only thing is, Trident doesn't support multiple output streams to express rich connections (i.e topologies which need complex workflows/DAGs) though I had filed a bug for the same (as Nathan had asked me to):https://groups.google.com/forum/#!searchin/storm-user/mridul/storm-user/G8POD1Hb89I/hcGYH1nf230J
            Anyway, PIG also supports only linear chains and so I suppose the above is not a problem as far as programming model goes.

          But if anyone finds potential pitfalls in converting to Trident over Vanilla Storm, please do point out.

          Mridul

          Show
          Mridul Jain added a comment - We have been planning to do that comparison, but it would help if someone has already done any benchmarking. Just last week I was trying to map our problem statement already expressed in PIG, to trident. We are trying to do two things at a high level: express Storm topology via PIG...secondly, have a mixed mode where we can express online(storm)-offline(map-reduce) jobs via the same PIG scripts and pass data between the two...as many of our systems require offline processing of data (long running ML jobs) in addition to fastpath processing via Storm, for the incoming data. We do have some literature/arch worked-out for the same, which also tries to maintain semantic as well as syntactic compatibility with existing PIG. For all the above, we plan to convert from PIG to Trident directly due to the following reasons: Trident supports batch processing (which was available via transactional topologies in older version of Storm; but now has been subsumed via Trident) which should provide high throughput for the whole pipeline than tuple-by-tuple processing in vanilla Storm. Also certain ops like merge, join etc seem to match batch semantics, in general, naturally with Trident. Trident semantics seems to fit in very well with traditional PIG UDFs....as someone pointed out above...it can be easily converted from/to existing PIG UDFs. Only thing is, Trident doesn't support multiple output streams to express rich connections (i.e topologies which need complex workflows/DAGs) though I had filed a bug for the same (as Nathan had asked me to): https://groups.google.com/forum/#!searchin/storm-user/mridul/storm-user/G8POD1Hb89I/hcGYH1nf230J Anyway, PIG also supports only linear chains and so I suppose the above is not a problem as far as programming model goes. But if anyone finds potential pitfalls in converting to Trident over Vanilla Storm, please do point out. Mridul
          Hide
          Corey J. Nolet added a comment -

          I'd love to see/get this working with the current Storm release (0.8.2) if possible.

          Show
          Corey J. Nolet added a comment - I'd love to see/get this working with the current Storm release (0.8.2) if possible.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Mridul:
          In our experience at Twitter, Trident introduces pretty high overhead; in Summingbird, we relax the data delivery guarantees to get better throughput, and use Storm directly. Perhaps you want to try putting pig on top of Summingbird? If you did that, we might even be able to help . In any case, interested in seeing how all of this will turn out.

          Cheolsoo:
          No real objections to svn branch. In the past I've found it far easier to cooperate on significant branches on github, rather than maintain an svn branch (you can easily have multiple branches, reviews are easier, etc). That's how Bill Graham and I did the HBaseStorage rewrite a few years back. But really that's up to developers doing the work.

          Show
          Dmitriy V. Ryaboy added a comment - Mridul: In our experience at Twitter, Trident introduces pretty high overhead; in Summingbird, we relax the data delivery guarantees to get better throughput, and use Storm directly. Perhaps you want to try putting pig on top of Summingbird? If you did that, we might even be able to help . In any case, interested in seeing how all of this will turn out. Cheolsoo: No real objections to svn branch. In the past I've found it far easier to cooperate on significant branches on github, rather than maintain an svn branch (you can easily have multiple branches, reviews are easier, etc). That's how Bill Graham and I did the HBaseStorage rewrite a few years back. But really that's up to developers doing the work.
          Hide
          Cheolsoo Park added a comment -

          Dmitriy V. Ryaboy, sure. I just wanted to make sure we commit all the good work into Apache. I absolutely agree that it's up to people who are doing work.

          Show
          Cheolsoo Park added a comment - Dmitriy V. Ryaboy , sure. I just wanted to make sure we commit all the good work into Apache. I absolutely agree that it's up to people who are doing work.
          Hide
          Pradeep Gollakota added a comment -

          Jacob Perkins Wow... This is a great start! Thanks so much for hacking this out.

          Cheolsoo Park I agree that this work should be committed back to a branch on Apache. I foresee a lot more contributions and collaboration on this, so it would be easier to coordinate via Apache as opposed to the git mirror.

          Dmitriy V. Ryaboy I have been strongly considering writing this DSL to Summingbird instead of Trident/Storm. I am considering if there are going to be any implications to doing this though. By writing to Summingbird we would get both a real-time mode and a hybrid mode execution, which in my mind is a huge win. At Lithium Technologies, we have been considering using Summingbird for hybrid mode execution. The question in my mind is, do we want to use Summingbird if all we want is a real-time engine (i.e. storm). We can change the scope of this JIRA to write a Summingbird backend or we can open another JIRA to implement a Summingbird POC and then see where that gets us.

          Show
          Pradeep Gollakota added a comment - Jacob Perkins Wow... This is a great start! Thanks so much for hacking this out. Cheolsoo Park I agree that this work should be committed back to a branch on Apache. I foresee a lot more contributions and collaboration on this, so it would be easier to coordinate via Apache as opposed to the git mirror. Dmitriy V. Ryaboy I have been strongly considering writing this DSL to Summingbird instead of Trident/Storm. I am considering if there are going to be any implications to doing this though. By writing to Summingbird we would get both a real-time mode and a hybrid mode execution, which in my mind is a huge win. At Lithium Technologies, we have been considering using Summingbird for hybrid mode execution. The question in my mind is, do we want to use Summingbird if all we want is a real-time engine (i.e. storm). We can change the scope of this JIRA to write a Summingbird backend or we can open another JIRA to implement a Summingbird POC and then see where that gets us.
          Hide
          Jacob Perkins added a comment -

          Mridul Jain It's interesting that you bring the multiple output stream issue up. I'm actually currently working on that at the current company. I plan on making a pull request this week. I could try and share something sooner if you're interested.

          Dmitriy V. Ryaboy Cheolsoo Park I think I'll keep working via git for now. I would love some feedback on the current work (aside from what was already brought up about performance concerns of Trident vs pure storm). Even if it's just verification that what I've done so far works for someone other than me

          Show
          Jacob Perkins added a comment - Mridul Jain It's interesting that you bring the multiple output stream issue up. I'm actually currently working on that at the current company. I plan on making a pull request this week. I could try and share something sooner if you're interested. Dmitriy V. Ryaboy Cheolsoo Park I think I'll keep working via git for now. I would love some feedback on the current work (aside from what was already brought up about performance concerns of Trident vs pure storm). Even if it's just verification that what I've done so far works for someone other than me
          Hide
          Dmitriy V. Ryaboy added a comment -

          Oh I absolutely just meant collaboration on initial contrib to happen in github, for expediency. and fast iteration.
          Of course once this work is in a committable/mergeable state, it should go into Apache.

          Show
          Dmitriy V. Ryaboy added a comment - Oh I absolutely just meant collaboration on initial contrib to happen in github, for expediency. and fast iteration. Of course once this work is in a committable/mergeable state, it should go into Apache.
          Hide
          Mridul Jain added a comment -

          Dmitriy,
          Interesting point regarding Trident and SummingBird. I haven't looked into the complete details of SummingBird; but just out of curiosity wanted to know the relaxation in data delivery guarantees that you have done. Any pointers? The reason I am asking is that, we faced an odd problem (or atleast I thought it was odd) in Storm transactional topologies and was wondering if the same existed in Trident as well: https://groups.google.com/forum/#!searchin/storm-user/mridul/storm-user/yX3giAO3ZWs/nfx8ofzSlhcJ. As we were running on transactional topologies, I created an optional patch, where you could use batches but still have no strict commit orders. Haven't submitted the patch because it was for code which is supposed to be deprecated. That patch alone worked well for us because we didn't pay for the overhead on account of earlier batch failures.Trident does mention IBatchSpout, which says batches without transactions, though I have to explore it a bit. I am interested in knowing other optimizations in this direction..even if on Vanilla Storm.
          Another thing I wanted to know: We generally do batch(microbatches in Storm) processing in Storm than tuple by tuple processing, as it provides high throughput(especially when interacting with external systems like db updates etc) as well as fits in logically well when we work at batch level. Batch semantics is critical for us .....does SummingBird support batches. May be I need to go over the details of SummingBird....

          Show
          Mridul Jain added a comment - Dmitriy, Interesting point regarding Trident and SummingBird. I haven't looked into the complete details of SummingBird; but just out of curiosity wanted to know the relaxation in data delivery guarantees that you have done. Any pointers? The reason I am asking is that, we faced an odd problem (or atleast I thought it was odd) in Storm transactional topologies and was wondering if the same existed in Trident as well: https://groups.google.com/forum/#!searchin/storm-user/mridul/storm-user/yX3giAO3ZWs/nfx8ofzSlhcJ . As we were running on transactional topologies, I created an optional patch, where you could use batches but still have no strict commit orders. Haven't submitted the patch because it was for code which is supposed to be deprecated. That patch alone worked well for us because we didn't pay for the overhead on account of earlier batch failures.Trident does mention IBatchSpout, which says batches without transactions, though I have to explore it a bit. I am interested in knowing other optimizations in this direction..even if on Vanilla Storm. Another thing I wanted to know: We generally do batch(microbatches in Storm) processing in Storm than tuple by tuple processing, as it provides high throughput(especially when interacting with external systems like db updates etc) as well as fits in logically well when we work at batch level. Batch semantics is critical for us .....does SummingBird support batches. May be I need to go over the details of SummingBird....
          Hide
          Mridul Jain added a comment -

          Jacob Perkins : Thanks Jacob...Please share your patch whenever you have one: https://github.com/nathanmarz/storm/issues/638.

          Thanks

          Show
          Mridul Jain added a comment - Jacob Perkins : Thanks Jacob...Please share your patch whenever you have one: https://github.com/nathanmarz/storm/issues/638 . Thanks
          Hide
          Gianmarco De Francisci Morales added a comment -

          Canceling patch as it is not ready to be committed.

          Show
          Gianmarco De Francisci Morales added a comment - Canceling patch as it is not ready to be committed.
          Hide
          Corey J. Nolet added a comment -

          I've been excited about this feature for months. I'm curious if there is any plan to have it worked in. Is it still in the plan or has it halted for a reason and will not continue?

          Show
          Corey J. Nolet added a comment - I've been excited about this feature for months. I'm curious if there is any plan to have it worked in. Is it still in the plan or has it halted for a reason and will not continue?
          Hide
          Mridul Jain added a comment -

          We completed the implementation for "Pig on Storm" last month and are now testing it in production so that we could potentially opensource it. Here is an abstract:
          “We propose PIG as the primary language for expressing realtime stream processing logic and provide a working prototype on Storm. We also illustrate how legacy code written for MR in PIG, can run with minimal to no changes, on Storm. This includes running the existing PIG UDFs, seamlessly on Storm. Though PIG or Storm do not take any position on state, we have provided built-in support for advanced state semantics like sliding windows, global mutable state etc, which are required in real world applications. We take a detailed look into a prototype application (realtime anomaly detection/trending system), which elucidates the performance characteristics of this framework and rich expressibility of complex programming logic via PIG on streaming.
          Finally, we propose a "Hybrid Mode" where a single PIG script can express logic for both realtime streaming and batch jobs and also defines data exchange mechanisms between the two, without breaking the semantic & syntactic sanctity of PIG. The underlying system figures out what parts of the this PIG script to run on MR and what on Storm, automatically."

          Show
          Mridul Jain added a comment - We completed the implementation for "Pig on Storm" last month and are now testing it in production so that we could potentially opensource it. Here is an abstract: “We propose PIG as the primary language for expressing realtime stream processing logic and provide a working prototype on Storm. We also illustrate how legacy code written for MR in PIG, can run with minimal to no changes, on Storm. This includes running the existing PIG UDFs, seamlessly on Storm. Though PIG or Storm do not take any position on state, we have provided built-in support for advanced state semantics like sliding windows, global mutable state etc, which are required in real world applications. We take a detailed look into a prototype application (realtime anomaly detection/trending system), which elucidates the performance characteristics of this framework and rich expressibility of complex programming logic via PIG on streaming. Finally, we propose a "Hybrid Mode" where a single PIG script can express logic for both realtime streaming and batch jobs and also defines data exchange mechanisms between the two, without breaking the semantic & syntactic sanctity of PIG. The underlying system figures out what parts of the this PIG script to run on MR and what on Storm, automatically."

            People

            • Assignee:
              Jacob Perkins
              Reporter:
              Pradeep Gollakota
            • Votes:
              2 Vote for this issue
              Watchers:
              28 Start watching this issue

              Dates

              • Created:
                Updated:

                Development