Pig
  1. Pig
  2. PIG-1891

Enable StoreFunc to make intelligent decision based on job success or failure

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.11
    • Component/s: None
    • Labels:
    • Release Note:
      Hide
      This adds a new method, cleanupOnSuccess, to the StoreFunc interface, and thus will cause backward compatibility issues for users who directly implement this interface. Most store functions implement StoreFuncImpl, which will shield them from this issue as it implements the new method.
      Show
      This adds a new method, cleanupOnSuccess, to the StoreFunc interface, and thus will cause backward compatibility issues for users who directly implement this interface. Most store functions implement StoreFuncImpl, which will shield them from this issue as it implements the new method.

      Description

      We are in the process of using PIG for various data processing and component integration. Here is where we feel pig storage funcs lack:

      They are not aware if the over all job has succeeded. This creates a problem for storage funcs which needs to "upload" results into another system:

      DB, FTP, another file system etc.

      I looked at the DBStorage in the piggybank (http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?view=markup) and what I see is essentially a mechanism which for each task does the following:

      1. Creates a recordwriter (in this case open connection to db)
      2. Open transaction.
      3. Writes records into a batch
      4. Executes commit or rollback depending if the task was successful.

      While this aproach works great on a task level, it does not work at all on a job level.

      If certain tasks will succeed but over job will fail, partial records are going to get uploaded into the DB.

      Any ideas on the workaround?

      Our current workaround is fairly ugly: We created a java wrapper that launches pig jobs and then uploads to DB's once pig's job is successful. While the approach works, it's not really integrated into pig.

      1. PIG-1891-3.patch
        10 kB
        Eli Reisman
      2. PIG-1891-2.patch
        9 kB
        Eli Reisman
      3. PIG-1891-1.patch
        9 kB
        Eli Reisman

        Issue Links

          Activity

          Hide
          Eli Reisman added a comment -

          I'd be happy to try to wrap the calls in try/catch on a new JIRA if you like. If you guys would rather have someone else take a swipe at this, or there's more to the fix and it needs a new JIRA by someone that can describe the problem/fix better than I, thats fine with me too.

          Show
          Eli Reisman added a comment - I'd be happy to try to wrap the calls in try/catch on a new JIRA if you like. If you guys would rather have someone else take a swipe at this, or there's more to the fix and it needs a new JIRA by someone that can describe the problem/fix better than I, thats fine with me too.
          Hide
          Alan Gates added a comment -

          This is my bad. I figured most people extended StoreFunc rather than implemented StoreFuncInterface. I can add do a patch to fix it.

          Show
          Alan Gates added a comment - This is my bad. I figured most people extended StoreFunc rather than implemented StoreFuncInterface. I can add do a patch to fix it.
          Hide
          Bill Graham added a comment -

          Eli, in other places in the code we'll surround the call to a newly introduced method with a catch of NoSuchMethodError FYI. That would help in this case.

          Show
          Bill Graham added a comment - Eli, in other places in the code we'll surround the call to a newly introduced method with a catch of NoSuchMethodError FYI. That would help in this case.
          Hide
          Dmitriy V. Ryaboy added a comment -

          I'm just grouchy cause I can't move a class of jobs till we fix a StoreFunc in Elephant-Bird . Appreciate you doing the work, this is a good feature! It's actually documented as an incompatible feature (in release notes on this ticket, and by having the patch listed under "incompatible changes" in CHANGES.txt). So procedurally speaking, it's fine.

          We can probably have a guard around this issue by checking if the class has a declared method "cleanupOnSuccess", which would restore backwards compatibility.

          Show
          Dmitriy V. Ryaboy added a comment - I'm just grouchy cause I can't move a class of jobs till we fix a StoreFunc in Elephant-Bird . Appreciate you doing the work, this is a good feature! It's actually documented as an incompatible feature (in release notes on this ticket, and by having the patch listed under "incompatible changes" in CHANGES.txt). So procedurally speaking, it's fine. We can probably have a guard around this issue by checking if the class has a declared method "cleanupOnSuccess", which would restore backwards compatibility.
          Hide
          Eli Reisman added a comment -

          Good point. I was assigned this JIRA to get introduced to Pig originally,
          and did not know the controversy adding this functionality would generate!
          I am fine with whatever fix or documentation you guys feel is relevant.

          On Wed, Sep 26, 2012 at 6:01 PM, Dmitriy V. Ryaboy (JIRA)

          Show
          Eli Reisman added a comment - Good point. I was assigned this JIRA to get introduced to Pig originally, and did not know the controversy adding this functionality would generate! I am fine with whatever fix or documentation you guys feel is relevant. On Wed, Sep 26, 2012 at 6:01 PM, Dmitriy V. Ryaboy (JIRA)
          Hide
          Dmitriy V. Ryaboy added a comment -

          Alan, Eli – this is not a backwards compatible change. Anything that implements StoreFuncInterface but does not extend StoreFunc now breaks. At the very least, this fact should be explicitly documented in the release notes (better yet, not having this method wouldn't kill things).

          Show
          Dmitriy V. Ryaboy added a comment - Alan, Eli – this is not a backwards compatible change. Anything that implements StoreFuncInterface but does not extend StoreFunc now breaks. At the very least, this fact should be explicitly documented in the release notes (better yet, not having this method wouldn't kill things).
          Hide
          Eli Reisman added a comment -

          Thanks for your patience! I hope to dig into juicier slices of ham very soon!

          Show
          Eli Reisman added a comment - Thanks for your patience! I hope to dig into juicier slices of ham very soon!
          Hide
          Alan Gates added a comment -

          Patch checked in. Thanks Eli.

          Show
          Alan Gates added a comment - Patch checked in. Thanks Eli.
          Hide
          Eli Reisman added a comment -

          Now when I run my local machine tests with 'ant test-commit' on PIG-1891-3.patch + trunk, I get this error (and only this error):

          Testcase: testNumSamples took 22.016 sec
          FAILED
          expected:<47> but was:<42>
          junit.framework.AssertionFailedError: expected:<47> but was:<42>
          at org.apache.pig.test.TestPoissonSampleLoader.testNumSamples(TestPoissonSampleLoader.java:125)

          I did not alter then number of allowed instantiations in the TestLoadStoreFuncLifeCycle test for loads, just stores, so perhaps this set off a ripple effect of other problems, its odd that the fail is in a loader. But I am unsure if this is directly related to this patch or an existing problem you guys know about so i thought I'd post here before hunting it down. Thanks again!

          Show
          Eli Reisman added a comment - Now when I run my local machine tests with 'ant test-commit' on PIG-1891 -3.patch + trunk, I get this error (and only this error): Testcase: testNumSamples took 22.016 sec FAILED expected:<47> but was:<42> junit.framework.AssertionFailedError: expected:<47> but was:<42> at org.apache.pig.test.TestPoissonSampleLoader.testNumSamples(TestPoissonSampleLoader.java:125) I did not alter then number of allowed instantiations in the TestLoadStoreFuncLifeCycle test for loads, just stores, so perhaps this set off a ripple effect of other problems, its odd that the fail is in a loader. But I am unsure if this is directly related to this patch or an existing problem you guys know about so i thought I'd post here before hunting it down. Thanks again!
          Hide
          Eli Reisman added a comment -

          This alters the test to allow up to 4 instantiations as Alan mentioned. If there's a better solution to this issue, let me know. Thanks again!

          Show
          Eli Reisman added a comment - This alters the test to allow up to 4 instantiations as Alan mentioned. If there's a better solution to this issue, let me know. Thanks again!
          Hide
          Eli Reisman added a comment -

          I can try to avoid the re-instantiation if you like, or bump the test value, whatever is best. And you're comfortable the other test issue is something else? This passed the test suite for me that that was a while back, and I'm not extremely knowledgeable on all the areas of the code I'm touching here. Hope to be soon

          Show
          Eli Reisman added a comment - I can try to avoid the re-instantiation if you like, or bump the test value, whatever is best. And you're comfortable the other test issue is something else? This passed the test suite for me that that was a while back, and I'm not extremely knowledgeable on all the areas of the code I'm touching here. Hope to be soon
          Hide
          Alan Gates added a comment -

          Never mind on TestMacroExpansion. I see that is failing in trunk as well.

          Show
          Alan Gates added a comment - Never mind on TestMacroExpansion. I see that is failing in trunk as well.
          Hide
          Alan Gates added a comment -

          This adds a failure in TestLoadStoreFuncLifeCycle and TestMacroExpansion.

          In TestLoadStoreFuncLifeCycle the failure is because it re-instantiates the store function again. Julien had put in tests to make sure the number of instantiation stays down. After talking with him he said he thought this patch was fine, so you can bump up the instantiation number it checks for from 3 to 4.

          I'm not clear what's driving the failure in TestMacroExpansion.

          I'll run the e2e tests as well as post results.

          Show
          Alan Gates added a comment - This adds a failure in TestLoadStoreFuncLifeCycle and TestMacroExpansion. In TestLoadStoreFuncLifeCycle the failure is because it re-instantiates the store function again. Julien had put in tests to make sure the number of instantiation stays down. After talking with him he said he thought this patch was fine, so you can bump up the instantiation number it checks for from 3 to 4. I'm not clear what's driving the failure in TestMacroExpansion. I'll run the e2e tests as well as post results.
          Hide
          Alan Gates added a comment -

          Looks reasonable. I'll run the tests on it.

          Show
          Alan Gates added a comment - Looks reasonable. I'll run the tests on it.
          Hide
          Alex Rovner added a comment -

          I also want to mention that this additional call will be useful in HCatalog. As of now there is some workaround to get the same behavior.

          Show
          Alex Rovner added a comment - I also want to mention that this additional call will be useful in HCatalog. As of now there is some workaround to get the same behavior.
          Hide
          Alex Rovner added a comment -

          Thanks Eli. Looks pretty good to me.

          Alan – Do you have any comments?

          Show
          Alex Rovner added a comment - Thanks Eli. Looks pretty good to me. Alan – Do you have any comments?
          Hide
          Eli Reisman added a comment -

          Hey Alan, what do you think of this?

          It restores cleanupOnFailureImpl (why is this exposed in the interface at all, btw?) and does not attempt to implement cleanupOnSuccess, just adds it where relevant. This way users can implement it themselves if they need it in their StoreFunc.

          Also: would you look at the way it is wired into PigServer#launchPlan() I'm giving it the same args that cleanupOnFailure() gets but I'm not certain this is the information a user would want it to receive. I expect if they do implement cleanupOnSuccess, these args will provide the data to delete? In the DB example here in this thread, will the data already have been successfully loaded to DB by the user code,and this merely has to erase unneeded files the data was stored in during processing steps after the fact? Would cleanupOnSuccess include the 'load to database' and 'erase leftover files' code together?

          Anyway, thanks, let me know if this is what we need or I'm on the right track, thanks again.

          Show
          Eli Reisman added a comment - Hey Alan, what do you think of this? It restores cleanupOnFailureImpl (why is this exposed in the interface at all, btw?) and does not attempt to implement cleanupOnSuccess, just adds it where relevant. This way users can implement it themselves if they need it in their StoreFunc. Also: would you look at the way it is wired into PigServer#launchPlan() I'm giving it the same args that cleanupOnFailure() gets but I'm not certain this is the information a user would want it to receive. I expect if they do implement cleanupOnSuccess, these args will provide the data to delete? In the DB example here in this thread, will the data already have been successfully loaded to DB by the user code,and this merely has to erase unneeded files the data was stored in during processing steps after the fact? Would cleanupOnSuccess include the 'load to database' and 'erase leftover files' code together? Anyway, thanks, let me know if this is what we need or I'm on the right track, thanks again.
          Hide
          Eli Reisman added a comment -

          I'll take a look at where the framework should call it. Its been a while but as I recall the cleanupImpl is called from within the same old cleanupFailure that was already there, still in place. I moved the code to cleanupImpl so I could also call it from cleanupSuccess as the function was the same, only the context of the call differs. I suppose when people override these methods there might be more differences. I'll take a look at the code today, and try to have another patch up ASAP. Thanks again, if there's anything else I've overlooked please let me know.

          Show
          Eli Reisman added a comment - I'll take a look at where the framework should call it. Its been a while but as I recall the cleanupImpl is called from within the same old cleanupFailure that was already there, still in place. I moved the code to cleanupImpl so I could also call it from cleanupSuccess as the function was the same, only the context of the call differs. I suppose when people override these methods there might be more differences. I'll take a look at the code today, and try to have another patch up ASAP. Thanks again, if there's anything else I've overlooked please let me know.
          Hide
          Alan Gates added a comment -

          I don't see where cleanupOnSuccess is invoked by the system, so I assume the purpose of this patch is to propose the change to the interface, not to actually implement the functionality yet. On this assumption, the patch looks ok except for one issue: StoreFunc is a public stable class. You can't change the name of publicly available methods. Changing cleanupOnFailure to cleanupImpl breaks backwards compatibility.

          Show
          Alan Gates added a comment - I don't see where cleanupOnSuccess is invoked by the system, so I assume the purpose of this patch is to propose the change to the interface, not to actually implement the functionality yet. On this assumption, the patch looks ok except for one issue: StoreFunc is a public stable class. You can't change the name of publicly available methods. Changing cleanupOnFailure to cleanupImpl breaks backwards compatibility.
          Hide
          Jakob Homan added a comment -

          This looks good to me. +1 on the patch, for what it's worth. This is what we're looking for. Bill Graham, how does this look to you?

          Show
          Jakob Homan added a comment - This looks good to me. +1 on the patch, for what it's worth. This is what we're looking for. Bill Graham , how does this look to you?
          Hide
          Eli Reisman added a comment -

          A first attempt at the cleanupOnSuccess() solution proposed in the comment thread. And a first attempt at contributing to Pig

          Show
          Eli Reisman added a comment - A first attempt at the cleanupOnSuccess() solution proposed in the comment thread. And a first attempt at contributing to Pig
          Hide
          Bill Graham added a comment -

          This would be a useful feature and I've wanted to have it in the past. I don't think this really warrants yet another interface though. StoreFuncInterface already has cleanupOnFailure so it makes sense for it to have something like onSuccess as well. It could be an empty method in StoreFunc, which I'd expect more people currently extend.

          Show
          Bill Graham added a comment - This would be a useful feature and I've wanted to have it in the past. I don't think this really warrants yet another interface though. StoreFuncInterface already has cleanupOnFailure so it makes sense for it to have something like onSuccess as well. It could be an empty method in StoreFunc, which I'd expect more people currently extend.
          Hide
          Stan Rosenberg added a comment -

          I wanted to chime in. I agree with Jakob except I'd name this callback 'postProcess'. It's a very easy change to sneak in without any impact on existing code.

          a) Introduce new interface, say 'StoreFuncOnSuccess' with the method 'void postProcess(String location, Job job) throws IOException;

          b) Modify PigServer.launchPlan to invoke 'postProcess'.

          I think this feature is a must have for complex storage UDFs.

          Show
          Stan Rosenberg added a comment - I wanted to chime in. I agree with Jakob except I'd name this callback 'postProcess'. It's a very easy change to sneak in without any impact on existing code. a) Introduce new interface, say 'StoreFuncOnSuccess' with the method 'void postProcess(String location, Job job) throws IOException; b) Modify PigServer.launchPlan to invoke 'postProcess'. I think this feature is a must have for complex storage UDFs.
          Hide
          Alex Rovner added a comment -

          Correct just like we have cleanupOnFailure. We should have a cleanupOnSuccess.

          Show
          Alex Rovner added a comment - Correct just like we have cleanupOnFailure. We should have a cleanupOnSuccess.
          Hide
          Daniel Dai added a comment -

          Hi, Jakob,
          Just make sure you realize there is a cleanupOnFailure in StoreFunc. So you want a symmetric hook when job success? OutputFormat.CleanupJob does not help?

          Show
          Daniel Dai added a comment - Hi, Jakob, Just make sure you realize there is a cleanupOnFailure in StoreFunc. So you want a symmetric hook when job success? OutputFormat.CleanupJob does not help?
          Hide
          Jakob Homan added a comment -

          We're facing a similar issue and would also vote to add this functionality. A cleanupOnSuccess method seems like the most reasonable place to keep code like this.

          Show
          Jakob Homan added a comment - We're facing a similar issue and would also vote to add this functionality. A cleanupOnSuccess method seems like the most reasonable place to keep code like this.
          Hide
          Alan Gates added a comment -

          When we redesigned the load and store interfaces in 0.7 we made a design decision to not duplicate Hadoop functionality, but to be as thin a layer as possible. Of course where there are things everyone will want to do, it makes sense to make those easier and deal with a little duplication. My sense is that this is not one of those cases. But if we see many others voting for this feature, I could be convinced that this would make sense. I will leave this JIRA open for now to see how others vote. Though I will change the priority to minor.

          I will also forward this information to Corrine (who writes our docs). She may want to include it in her section on store functions.

          Show
          Alan Gates added a comment - When we redesigned the load and store interfaces in 0.7 we made a design decision to not duplicate Hadoop functionality, but to be as thin a layer as possible. Of course where there are things everyone will want to do, it makes sense to make those easier and deal with a little duplication. My sense is that this is not one of those cases. But if we see many others voting for this feature, I could be convinced that this would make sense. I will leave this JIRA open for now to see how others vote. Though I will change the priority to minor. I will also forward this information to Corrine (who writes our docs). She may want to include it in her section on store functions.
          Hide
          Alex Rovner added a comment -

          Alan,

          After a bit of investigation, even though what you have described can be achieved through the OutputCommitter it still seems that it would be much easier if the store func would have a "commit" method which is called once the job is final. This would significantly simplify writing a store func.

          Currently if you take the OutputCommitter approach you would have to some how make the Commiter aware of what you want to do upon commit. This would mean that if you want to create SFTP store and DB store you would need to create your own StoreFunc, OutputFormat, RecordWriter and Outputcommiter. Seems a bit of an overkill for such a simple tasks?

          Alex

          Show
          Alex Rovner added a comment - Alan, After a bit of investigation, even though what you have described can be achieved through the OutputCommitter it still seems that it would be much easier if the store func would have a "commit" method which is called once the job is final. This would significantly simplify writing a store func. Currently if you take the OutputCommitter approach you would have to some how make the Commiter aware of what you want to do upon commit. This would mean that if you want to create SFTP store and DB store you would need to create your own StoreFunc, OutputFormat, RecordWriter and Outputcommiter. Seems a bit of an overkill for such a simple tasks? Alex
          Hide
          Alex Rovner added a comment -

          Alan,

          Thank you for the information. From what you are describing it's exactly what we are looking for. We are going to try to take that approach and will let you know how it goes.
          Thanks!

          Show
          Alex Rovner added a comment - Alan, Thank you for the information. From what you are describing it's exactly what we are looking for. We are going to try to take that approach and will let you know how it goes. Thanks!
          Hide
          Alan Gates added a comment -

          It sounds like what you want is a way for the storage function to inject code into OutputCommitter.cleanupJob. (See http://hadoop.apache.org/common/docs/r0.20.2/api/index.html for details. This is a final task that Hadoop runs after all reduces have finished.)

          At this point since this is already offered by Hadoop's OutputFormat we have left these things there, rather than mimic the interface in Pig. So the way to do this would be to have the OutputFormat you are using return an OutputCommitter that would do the commit (or whatever) in cleanupJob. You do not have to write a whole new OutputFormat for this. You can extend whatever OutputFormat you are using and the associated OutputCommitter it returns. Your extended OutputFormat should return your OutputCommitter in getOutputCommitter. Your OutputCommitter should only change cleanupJob, which should call super.cleanupJob and then do whatever you want to do.

          Show
          Alan Gates added a comment - It sounds like what you want is a way for the storage function to inject code into OutputCommitter.cleanupJob. (See http://hadoop.apache.org/common/docs/r0.20.2/api/index.html for details. This is a final task that Hadoop runs after all reduces have finished.) At this point since this is already offered by Hadoop's OutputFormat we have left these things there, rather than mimic the interface in Pig. So the way to do this would be to have the OutputFormat you are using return an OutputCommitter that would do the commit (or whatever) in cleanupJob. You do not have to write a whole new OutputFormat for this. You can extend whatever OutputFormat you are using and the associated OutputCommitter it returns. Your extended OutputFormat should return your OutputCommitter in getOutputCommitter. Your OutputCommitter should only change cleanupJob, which should call super.cleanupJob and then do whatever you want to do.

            People

            • Assignee:
              Eli Reisman
              Reporter:
              Alex Rovner
            • Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development