Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
-
Reviewed
Description
The SimpleProcessor could take care of all things like starting input, committing outputs. It would handle no events, since simple processors dont need to handle inputs. Thus the user would only need to implement their custom task logic in a new execute() method.
Attachments
Attachments
- TEZ-1062.1.patch
- 7 kB
- Mohammad Islam
- TEZ-1062.2.patch
- 22 kB
- Mohammad Islam
- TEZ-1062.3.patch
- 22 kB
- Mohammad Islam
- TEZ-1062.4.patch
- 22 kB
- Bikas Saha
Activity
Sure.
bikassaha How will it be related to TEZ-694? Will that be done separately based on this SimpleProcessor?
For now, we can start with just the code that got left out of TEZ-700 and provide the empty impls along with a pre-op that calls input.start() for all inputs. Lets see how that looks. Then we can add a post-op that examines all outputs and calls commit for all instances of MROutput.
Looks good overall.
This needs to be in the tez-runtime-library project under new package org.apache.tez.runtime.library.processor
In general either input or outputs could be null.
+ Preconditions.checkNotNull(inputs, "inputs can't be null"); + Preconditions.checkNotNull(outputs, "ouputs can't be null");
This means there we will need null checks in other places.
How about following exposing the inputs and outputs via getters and renaming this to run()?
+ public abstract void execute(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) + throws Exception;
I think it makes sense to move this code into the postOp of SimpleProcessor.
Secondly, we should call getContext().canCommit() only once. Sorry, the code in original UnionExample is wrong. So we need to check if commit is required. If yes, then get permission from context, then commit all outputs that need commit. If any output fails to commit then we should abort all the outputs that needed commit.
+ protected void postOp(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) + throws Exception { + for (LogicalOutput output : outputs.values()) { + if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) { + while (!getContext().canCommit()) { + Thread.sleep(100); + } + ((MROutput) output).commit(); + } + }
There are 3 pure Tez examples now, WordCount, UnionExample and BroadcastAndOneToOneExample. We should change all of them to use the SimpleProcessor where it makes sense.
Thanks again bikassaha for the review.
Need some clarifications:
Secondly, we should call getContext().canCommit() only once ...
Struggling to find the correct ordering of two checks. How about this?
+ while (!getContext().canCommit()) { + Thread.sleep(100); + } + for (LogicalOutput output : outputs.values()) { + if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) { + ((MROutput) output).commit(); + } + }
Related question:
If any output fails to commit then we should abort all the outputs that needed commit.
I think already committed outputs can't be aborted.
How to determine some output fails? when output.commit() throw exception?
By abort, do you mean to break the loop.
EDITED:
I think it makes sense to move this code into the postOp of SimpleProcessor.
MROutput is not accessible from Tez-runitme-library. If I include the tez-mapreduce (where MROutput resides) in the pom.xml of tez-runtime-library module, it creates a circular dependency . any thoughts?
We should first go through all outputs to check if they are MROutput and if they need commit. Collect these. If non-empty collection ask for commit permission. After that call commit on all collected outputs. If any commit throw exception then call abort on all collected outputs.
We can create a SimpleProcessor in tez-runtime-library and SimpleProcessorWithMRCommit that derives from SimpleProcessor() in tez-mapreduce project.
We should first go through all outputs to check if they are MROutput and if they need commit.
This is wrong IMO. Checking for specific classes should not be done. Either assume all outputs need a commit or query the output to check if it requires a commit. I believe currently pretty much all current outputs ( intermediate data outputs too ) that write to disk need a commit as this is a task level commit to handle cases such as speculative attempts.
Intermediate outputs don't need a commit. They always write to their own specific paths - and if multiple attempts succeed (speculative), it should be possible to pull data from any of the successful attempts.
We should first go through all outputs to check if they are MROutput
This is what we were discussing in TEZ-694 but we decided that we do not have enough context to formalize an API on the output that will tell us if an output needs commit. We only have 1 output that needs commit and we can special case that.
https://issues.apache.org/jira/browse/TEZ-694?focusedCommentId=13972156&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13972156
Uploaded with reviews comments.
Didn't ran the example code yet. After the review, will do that.
Shouldnt we break out here?
+ output.commit(); + } catch (IOException ioe) { + willAbort = true; + savedEx = ioe; + }
Overall this looks good to me. Hitesh, please let me know if the separation of SimpleProcessor and SimpleMRProcessor addresses your concerns based on the comments linked in the TEZ-694. SimpleMRProcessor is the one that takes care of MROutput commit.
Shouldnt we break out here?
Yes. I missed that. I will do that once we get the feedback from hitesh.
bikassaha Sounds good - as long as MROutput commit is invoked only for MR processor, this should be fine. I am assuming that the expectation of someone using a SimpleProcessor has to call commit on the outputs themselves?
Yes. SimpleProcessor has a postop() that is invoked after run(). The derived class can call commit in its impl of the postop() method.
kamrul I think we can proceed with the final patch.
Thanks for your contribution. Committed.
commit b084c7f8d12ff38ff3d824734e009a1fc71ee20b
Author: Bikas Saha <bikas@apache.org>
Date: Sun Apr 27 16:55:16 2014 -0700
TEZ-1062. Create SimpleProcessor for processors that only need to implement the run method (Mohammad Kamrul Islam via bikas)
kamrul This is motivated by the discussion we had on
TEZ-700about simplifying most of the existing processors we currently have. We skipped that inTEZ-700but we can now base onTEZ-695and create another abstract class SimpleProcessor that provides impls for all base methods except for the run method. We can use your suggestion of having simpleprocessor.run(Map<Inputs, Outputs>) call input.start() and then call real.run(). Similar to what we did for the initialize() method. This will really simplify writing simple processors.This wont work for Input/Output since they usually have to handle events (unlike Processor). What do you think?