Uploaded image for project: 'Apache Tez'
  1. Apache Tez
  2. TEZ-690 Tez API Ease of Use
  3. TEZ-1062

Create SimpleProcessor for processors that only need to implement the run method

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.5.0
    • None
    • None
    • Reviewed

    Description

      The SimpleProcessor could take care of all things like starting input, committing outputs. It would handle no events, since simple processors dont need to handle inputs. Thus the user would only need to implement their custom task logic in a new execute() method.

      Attachments

        1. TEZ-1062.1.patch
          7 kB
          Mohammad Islam
        2. TEZ-1062.2.patch
          22 kB
          Mohammad Islam
        3. TEZ-1062.3.patch
          22 kB
          Mohammad Islam
        4. TEZ-1062.4.patch
          22 kB
          Bikas Saha

        Activity

          bikassaha Bikas Saha added a comment -

          kamrul This is motivated by the discussion we had on TEZ-700 about simplifying most of the existing processors we currently have. We skipped that in TEZ-700 but we can now base on TEZ-695 and create another abstract class SimpleProcessor that provides impls for all base methods except for the run method. We can use your suggestion of having simpleprocessor.run(Map<Inputs, Outputs>) call input.start() and then call real.run(). Similar to what we did for the initialize() method. This will really simplify writing simple processors.
          This wont work for Input/Output since they usually have to handle events (unlike Processor). What do you think?

          bikassaha Bikas Saha added a comment - kamrul This is motivated by the discussion we had on TEZ-700 about simplifying most of the existing processors we currently have. We skipped that in TEZ-700 but we can now base on TEZ-695 and create another abstract class SimpleProcessor that provides impls for all base methods except for the run method. We can use your suggestion of having simpleprocessor.run(Map<Inputs, Outputs>) call input.start() and then call real.run(). Similar to what we did for the initialize() method. This will really simplify writing simple processors. This wont work for Input/Output since they usually have to handle events (unlike Processor). What do you think?
          kamrul Mohammad Islam added a comment -

          Sure.

          bikassaha How will it be related to TEZ-694? Will that be done separately based on this SimpleProcessor?

          kamrul Mohammad Islam added a comment - Sure. bikassaha How will it be related to TEZ-694 ? Will that be done separately based on this SimpleProcessor?
          bikassaha Bikas Saha added a comment -

          For now, we can start with just the code that got left out of TEZ-700 and provide the empty impls along with a pre-op that calls input.start() for all inputs. Lets see how that looks. Then we can add a post-op that examines all outputs and calls commit for all instances of MROutput.

          bikassaha Bikas Saha added a comment - For now, we can start with just the code that got left out of TEZ-700 and provide the empty impls along with a pre-op that calls input.start() for all inputs. Lets see how that looks. Then we can add a post-op that examines all outputs and calls commit for all instances of MROutput.
          kamrul Mohammad Islam added a comment -

          WIP patch to get the early feedback.

          kamrul Mohammad Islam added a comment - WIP patch to get the early feedback.
          bikassaha Bikas Saha added a comment -

          Looks good overall.

          This needs to be in the tez-runtime-library project under new package org.apache.tez.runtime.library.processor

          In general either input or outputs could be null.

          +    Preconditions.checkNotNull(inputs, "inputs can't be null");
          +    Preconditions.checkNotNull(outputs, "ouputs can't be null");

          This means there we will need null checks in other places.

          How about following exposing the inputs and outputs via getters and renaming this to run()?

          +  public abstract void execute(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
          +      throws Exception;

          I think it makes sense to move this code into the postOp of SimpleProcessor.
          Secondly, we should call getContext().canCommit() only once. Sorry, the code in original UnionExample is wrong. So we need to check if commit is required. If yes, then get permission from context, then commit all outputs that need commit. If any output fails to commit then we should abort all the outputs that needed commit.

          +    protected void postOp(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
          +        throws Exception {
          +      for (LogicalOutput output : outputs.values()) {
          +        if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) {
          +          while (!getContext().canCommit()) {
          +            Thread.sleep(100);
          +          }
          +          ((MROutput) output).commit();
          +        }
          +      }
          

          There are 3 pure Tez examples now, WordCount, UnionExample and BroadcastAndOneToOneExample. We should change all of them to use the SimpleProcessor where it makes sense.

          bikassaha Bikas Saha added a comment - Looks good overall. This needs to be in the tez-runtime-library project under new package org.apache.tez.runtime.library.processor In general either input or outputs could be null. + Preconditions.checkNotNull(inputs, "inputs can't be null " ); + Preconditions.checkNotNull(outputs, "ouputs can't be null " ); This means there we will need null checks in other places. How about following exposing the inputs and outputs via getters and renaming this to run()? + public abstract void execute(Map< String , LogicalInput> inputs, Map< String , LogicalOutput> outputs) + throws Exception; I think it makes sense to move this code into the postOp of SimpleProcessor. Secondly, we should call getContext().canCommit() only once. Sorry, the code in original UnionExample is wrong. So we need to check if commit is required. If yes, then get permission from context, then commit all outputs that need commit. If any output fails to commit then we should abort all the outputs that needed commit. + protected void postOp(Map< String , LogicalInput> inputs, Map< String , LogicalOutput> outputs) + throws Exception { + for (LogicalOutput output : outputs.values()) { + if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) { + while (!getContext().canCommit()) { + Thread .sleep(100); + } + ((MROutput) output).commit(); + } + } There are 3 pure Tez examples now, WordCount, UnionExample and BroadcastAndOneToOneExample. We should change all of them to use the SimpleProcessor where it makes sense.
          kamrul Mohammad Islam added a comment - - edited

          Thanks again bikassaha for the review.
          Need some clarifications:

          Secondly, we should call getContext().canCommit() only once ...

          Struggling to find the correct ordering of two checks. How about this?

          +   while (!getContext().canCommit()) {
          +        Thread.sleep(100);
          +    }
          +   for (LogicalOutput output : outputs.values()) {
          +        if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) {
          +          ((MROutput) output).commit();
          +        }
          +    }
          

          Related question:

          If any output fails to commit then we should abort all the outputs that needed commit.

          I think already committed outputs can't be aborted.

          How to determine some output fails? when output.commit() throw exception?

          By abort, do you mean to break the loop.

          EDITED:

          I think it makes sense to move this code into the postOp of SimpleProcessor.

          MROutput is not accessible from Tez-runitme-library. If I include the tez-mapreduce (where MROutput resides) in the pom.xml of tez-runtime-library module, it creates a circular dependency . any thoughts?

          kamrul Mohammad Islam added a comment - - edited Thanks again bikassaha for the review. Need some clarifications: Secondly, we should call getContext().canCommit() only once ... Struggling to find the correct ordering of two checks. How about this? + while (!getContext().canCommit()) { + Thread.sleep(100); + } + for (LogicalOutput output : outputs.values()) { + if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) { + ((MROutput) output).commit(); + } + } Related question: If any output fails to commit then we should abort all the outputs that needed commit. I think already committed outputs can't be aborted. How to determine some output fails? when output.commit() throw exception? By abort, do you mean to break the loop. EDITED: I think it makes sense to move this code into the postOp of SimpleProcessor. MROutput is not accessible from Tez-runitme-library. If I include the tez-mapreduce (where MROutput resides) in the pom.xml of tez-runtime-library module, it creates a circular dependency . any thoughts?
          bikassaha Bikas Saha added a comment -

          We should first go through all outputs to check if they are MROutput and if they need commit. Collect these. If non-empty collection ask for commit permission. After that call commit on all collected outputs. If any commit throw exception then call abort on all collected outputs.
          We can create a SimpleProcessor in tez-runtime-library and SimpleProcessorWithMRCommit that derives from SimpleProcessor() in tez-mapreduce project.

          bikassaha Bikas Saha added a comment - We should first go through all outputs to check if they are MROutput and if they need commit. Collect these. If non-empty collection ask for commit permission. After that call commit on all collected outputs. If any commit throw exception then call abort on all collected outputs. We can create a SimpleProcessor in tez-runtime-library and SimpleProcessorWithMRCommit that derives from SimpleProcessor() in tez-mapreduce project.
          hitesh Hitesh Shah added a comment -

          We should first go through all outputs to check if they are MROutput and if they need commit.

          This is wrong IMO. Checking for specific classes should not be done. Either assume all outputs need a commit or query the output to check if it requires a commit. I believe currently pretty much all current outputs ( intermediate data outputs too ) that write to disk need a commit as this is a task level commit to handle cases such as speculative attempts.

          hitesh Hitesh Shah added a comment - We should first go through all outputs to check if they are MROutput and if they need commit. This is wrong IMO. Checking for specific classes should not be done. Either assume all outputs need a commit or query the output to check if it requires a commit. I believe currently pretty much all current outputs ( intermediate data outputs too ) that write to disk need a commit as this is a task level commit to handle cases such as speculative attempts.
          sseth Siddharth Seth added a comment -

          Intermediate outputs don't need a commit. They always write to their own specific paths - and if multiple attempts succeed (speculative), it should be possible to pull data from any of the successful attempts.

          sseth Siddharth Seth added a comment - Intermediate outputs don't need a commit. They always write to their own specific paths - and if multiple attempts succeed (speculative), it should be possible to pull data from any of the successful attempts.
          bikassaha Bikas Saha added a comment -

          We should first go through all outputs to check if they are MROutput

          This is what we were discussing in TEZ-694 but we decided that we do not have enough context to formalize an API on the output that will tell us if an output needs commit. We only have 1 output that needs commit and we can special case that.
          https://issues.apache.org/jira/browse/TEZ-694?focusedCommentId=13972156&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13972156

          bikassaha Bikas Saha added a comment - We should first go through all outputs to check if they are MROutput This is what we were discussing in TEZ-694 but we decided that we do not have enough context to formalize an API on the output that will tell us if an output needs commit. We only have 1 output that needs commit and we can special case that. https://issues.apache.org/jira/browse/TEZ-694?focusedCommentId=13972156&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13972156
          kamrul Mohammad Islam added a comment -

          Uploaded with reviews comments.
          Didn't ran the example code yet. After the review, will do that.

          kamrul Mohammad Islam added a comment - Uploaded with reviews comments. Didn't ran the example code yet. After the review, will do that.
          bikassaha Bikas Saha added a comment -

          Shouldnt we break out here?

          +          output.commit();
          +        } catch (IOException ioe) {
          +          willAbort = true;
          +          savedEx = ioe;
          +        }
          bikassaha Bikas Saha added a comment - Shouldnt we break out here? + output.commit(); + } catch (IOException ioe) { + willAbort = true ; + savedEx = ioe; + }
          bikassaha Bikas Saha added a comment -

          Overall this looks good to me. Hitesh, please let me know if the separation of SimpleProcessor and SimpleMRProcessor addresses your concerns based on the comments linked in the TEZ-694. SimpleMRProcessor is the one that takes care of MROutput commit.

          bikassaha Bikas Saha added a comment - Overall this looks good to me. Hitesh, please let me know if the separation of SimpleProcessor and SimpleMRProcessor addresses your concerns based on the comments linked in the TEZ-694 . SimpleMRProcessor is the one that takes care of MROutput commit.
          kamrul Mohammad Islam added a comment -

          bikassaha

          Shouldnt we break out here?

          Yes. I missed that. I will do that once we get the feedback from hitesh.

          kamrul Mohammad Islam added a comment - bikassaha Shouldnt we break out here? Yes. I missed that. I will do that once we get the feedback from hitesh .
          hitesh Hitesh Shah added a comment -

          bikassaha Sounds good - as long as MROutput commit is invoked only for MR processor, this should be fine. I am assuming that the expectation of someone using a SimpleProcessor has to call commit on the outputs themselves?

          hitesh Hitesh Shah added a comment - bikassaha Sounds good - as long as MROutput commit is invoked only for MR processor, this should be fine. I am assuming that the expectation of someone using a SimpleProcessor has to call commit on the outputs themselves?
          bikassaha Bikas Saha added a comment - - edited

          Yes. SimpleProcessor has a postop() that is invoked after run(). The derived class can call commit in its impl of the postop() method.
          kamrul I think we can proceed with the final patch.

          bikassaha Bikas Saha added a comment - - edited Yes. SimpleProcessor has a postop() that is invoked after run(). The derived class can call commit in its impl of the postop() method. kamrul I think we can proceed with the final patch.
          kamrul Mohammad Islam added a comment -

          new patch

          kamrul Mohammad Islam added a comment - new patch
          bikassaha Bikas Saha added a comment -

          Attaching commit patch with minor changes.

          bikassaha Bikas Saha added a comment - Attaching commit patch with minor changes.
          bikassaha Bikas Saha added a comment -

          Thanks for your contribution. Committed.
          commit b084c7f8d12ff38ff3d824734e009a1fc71ee20b
          Author: Bikas Saha <bikas@apache.org>
          Date: Sun Apr 27 16:55:16 2014 -0700

          TEZ-1062. Create SimpleProcessor for processors that only need to implement the run method (Mohammad Kamrul Islam via bikas)

          bikassaha Bikas Saha added a comment - Thanks for your contribution. Committed. commit b084c7f8d12ff38ff3d824734e009a1fc71ee20b Author: Bikas Saha <bikas@apache.org> Date: Sun Apr 27 16:55:16 2014 -0700 TEZ-1062 . Create SimpleProcessor for processors that only need to implement the run method (Mohammad Kamrul Islam via bikas)
          bikassaha Bikas Saha added a comment -

          Bulk close for jiras fixed in 0.5.0.

          bikassaha Bikas Saha added a comment - Bulk close for jiras fixed in 0.5.0.

          People

            kamrul Mohammad Islam
            bikassaha Bikas Saha
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: