|
> If the outputformat is FileOutputFormat, [ ... ]
Shouldn't this be done for any output format? We should add generic commit-related methods to OutputFormat and call these, rather than treat FileOutputFormat specially. So maybe we should define a subclass of OutputFormat (so that we don't break existing apps) called OutputFormatWithCommit and let FileOutputFormat implement that. The place where the task framework code should invoke the commit can then check whether the outputformat implements the new OutputFormatWithCommit, and, if so, invoke the commit method.
> maybe we should define a subclass of OutputFormat (so that we don't break existing apps)
I think we should probably just bite the bullet and switch OutputFormat to be an abstract class instead of an interface. Attached is a patch. This implements the following:
1) Makes the OutputFormat an abstract class with empty implementations for most methods: public abstract class OutputFormat<K, V> { abstract public RecordWriter<K, V> getRecordWriter(JobConf job, String name, Progressable progress) throws IOException; abstract public void checkOutputSpecs(JobConf job) throws IOException; void commitJobOutput(JobConf job) throws IOException { } void discardJobOutput(JobConf job) throws IOException { } public boolean isTaskCommitRequired(JobConf job, String attemptId) { return false; } void setTaskWorkOutput(JobConf job, String attemptId) throws IOException { } void createTaskWorkOutput(JobConf job, String attemptId) throws IOException { } void createJobWorkOutput(JobConf job) throws IOException { } void commitTaskOutput(JobConf job, String attemptId) throws IOException { } void discardTaskOutput(JobConf job, String attemptId) throws IOException { } } 2) Removes the FileOutputFormat dependencies from the Task and other framework classes. Instead defines some additional methods in the OutputFormat (though they have FileOutputFormat flavor but should be okay since the default implementation is empty. This is open for suggestions.). 3) Moves things like saveTaskOutput from Task.java to the FileOutputFormat since that used to handle just FileOutputFormat anyway. 4) Adds a blocking RPC call canCommit. This call blocks at the tasktracker's end until the tasktracker hears from the JobTracker what this task should do - commit/discard the output. The debatable thing here is that we are blocking RPC handlers when a task reaches commit-pending state. So the expectation is that we'd hear back from the JobTracker pretty soon and anyway the tasktracker can't do much (like launching new tasks) before it hears from the JobTracker. Also the number of RPC handlers have been increased in the patch. There are ways to get around without blocking the RPC handler but this seemed like a simple approach and should not be a big deal since we are dealing with very (node) local RPCs. 5) A whole lot of changes to do getRecordWriter have been made in the patch to do with removal of the "ignored" parameter to the method. 6) The taskcommit queue code has been removed from the JT. This patch requires testing and may have some bugs at this point. But, I am hoping that it makes to 0.18. So could someone please take a quick look at the approach. Thanks! We should use the "context" style here. We should add something like:
public class OutputContext { // no public ctor -- only created by framework public Progressible getProgressible() {...}; public JobConf getJob(); } Then the OutputFormat methods can replace the JobConf parameters with OutputContext parameters. If we ever need to add more items to the context we can easily do so without breaking application code. Which leads back to the fact that I'm pretty uncomfortable with such a big change to the APIs this close the freeze date. The current OutputFormat method names are very file system specific. I'd prefer something like:
public class JobContext { public JobConf getJobConf(); } public class TaskAttemptContext { public TaskAttemptID getTaskAttemptID(); public JobConf getJobConf(); public Progressable getProgressable(); } public abstract class OutputFormat { abstract public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context ) throws IOException; abstract public void checkOutputSpecs(JobContext context) throws IOException; public abstract setupJob(JobContext context) throws IOException; public abstract cleanupJob(JobContext context, boolean promote) throws IOException; public abstract setupTask(TaskAttemptContext context) throws IOException; public abstract cleanupTask(TaskAttemptContext context, boolean promote) throws IOException; } At that point, you might as well move OutputFormat over to org.apache.hadoop.mapreduce and deprecate the old interface. I don't think having the TaskTracker rpc handler (handling a call from the task) block on a call to the JobTracker is workable. > I'm pretty uncomfortable with such a big change to the APIs this close the freeze date.
+1 Owen, I think JobContext should include a Progressible, since any of the operations might take a long time. And I might have TaskAttemptContext extend JobContext. Moving this to 0.19 since there is a major change in api and not everyone is comfortable with such a change being done so close to the feature freeze.
Today we give promotion to all jobs that have an mapred.output.dir defined. So, one more thing that needs to be fixed here is 'Applications creating side files with OutputFormat which is not an instance of FileOutputFormat'. For example, Hadoop archives has output format as NullOutputFormat, but the archive is created using task's side files. Thus 'moving the task commit to OutputFormat' will ignore the sidefiles. And also, 'with setupJob and setupTask moved to OutputFormat', the facility to create side files is removed.
To support creating side files even with OutputFormat which is not a FileOutputFormat, we can have SideFileOutputFormat which extends FileOutputFormat. If the job OutputFormat is not a FileOutputFormat and mapred.output.dir is defined, the framework will instantiate SideFileOutputFormat. static void setWorkOutputPath(JobConf conf, Path outputDir) public static Path getWorkOutputPath(JobConf conf) SideFileOutputFormat.getRecordWriter() will use TextOutputFormat's RecordWriter. Finally task commit will constitute commit of Job's OutputFormat and SideFileOutputFormat ( if Job's OutputFormat is not a FileOutputFormat). I think that we should just treat the side files as a special case. That is, it doesn't really need to inherit FileOutputFormat. It could use that internally. So we could have a class called SideFileWriter that provides the methods getSideFileDirectory . The implementation of getSideFileDirectory could internally use the FileOutputFormat API to get the workoutputpath. In the case where the outputformat of the job is a FileOutputFormat, the side files can be created in the workoutputpath as is done today. In the case where the outputformat is something else, the getSideFileDirectory api returns a directory that it synthesizes (something like _jobid/_temporary). The commit is done for both the SideFileWriter as well as the job's OutputFormat.
Does that work? Thoughts? We'd also need an api in SideFileWriter to set the final output directory for the side files.
Is https://issues.apache.org/jira/browse/HADOOP-3149 The problem is that we need to be able to deal with raw data in the sidefiles rather than records form of data.
For the side files, it just seems simple to be able to create a fileoutputstream, write some bytes there, and then close the file. Records are purely an artifact at abstraction level. Interesting... Combination of the Binary OutputFormat as proposed in HADOOP-3227 and
Rather that use HADOOP-3227, I think it would be better to extend
FSDataOutputStream createSideFile(String name) throws IOException; and the file will be managed by the output format, reside in the output dir, be named relative to the task, etc. Does that sound reasonable? Yes, I missed the point that a record writer might not be able to cater to all the needs that a user might have. For example, the user wants to write a stream of data that he generates. So we really need the FSDataOutputStream either through an API like what Owen suggested, or, let the user create it in the directory that we provide him.
I'd suggest that Initially i suggested that we define a class SideFileWriter to get/set the above, and thereby make it clear that he & the framework is dealing with side files, but I am ok either way.
My vote goes to the option of maintaining the current semantics of the the providing the api to get the directory where the user can create the side-files. It creates less api churn, also is more amenable to Hadoop Pipes and such others. There are a few different topics being discussed in this issue:
IMO this issue should only address the first topic. The gain of this is freeing the JT from doing the task output commit, leaving to the JT just the coordination of it. The third topic, as it has been suggested it could be address by Hadoop-3149, by adding an static method getOutputStream(JobConf conf, String baseName). This method would use the filename namespacing introduced by Hadoop-3149 (previously Hadoop-3258) to create a unique file under the job working output directory. Note that MultipleOutputs does not implement OutputFormat, because of this, IMO, we are not overloading it with unrelated behavior; MultipleOutputs just becomes a mean to create additional outputs, OutputFormat s or OutputStream s in the context of the output of a task consistent with the handling of the task output in the case of success completion and failure. The second topic is a whole thing on it own and I think it should be left to its own Jira:
Commit of side files can be supported by static commit method in FileOutputFormat. Then task commit can include OutputFormat's commit and side file's commit. Also, cleaning up _temporary directory at the end of the job needs to be static.
SideFile creation in Pipes is still supported by configuration property mapred.work.output.dir Here is a patch for review after reworking on Devaraj's earlier patch.
The new apis look the following: public class JobContext { public JobConf getJobConf(); } public class TaskAttemptContext { public TaskAttemptID getTaskAttemptID(); public JobConf getJobConf(); public Progressable getProgressable(); } public abstract class OutputFormat { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context ) throws IOException; public abstract void checkOutputSpecs(JobContext context) throws IOException; public abstract setupJob(JobContext context) throws IOException; public abstract cleanupJob(JobContext context) throws IOException; public abstract setupTask(TaskAttemptContext context) throws IOException; public abstract cleanupTask(TaskAttemptContext context, boolean promote) throws IOException; } I did not move getProgressable() to JobContext, because {setup/cleaup}Job() are not done during the job. So, they need not report progress. Output Commit is done as follows: The other issue is to support side files. Supporting of side files in involves three issues: The following describes how the side files are supported with the patch. Case 1: Job's OutputFormat is a FileOutputFormat : Side files are handled by the output format. Case 2: Job's OutputFormat is not a FileOutputFormat A lot of changes are to do with the api changes. The core changes are in OutputFormat.java, FileOutputFormat.java, Task.java, TaskTracker.java, JobInProgress.java, TaskInProgress.java and LocalJobRunner.java. Note: Since the commit of a task is done by OutputFormat, we cannot support old OutputFormat interface. I don't see a way to move the commit to the output format and still support the old interface. Thoughts? I don't like the fact that we are pushing all the commit logic to the OutputFormat implementation. It is making error prone for people implementing their own OutputFormat to get things wrong.
I would prefer to leave the OutputFormat interface as it is today and have a separate interface OutputCommitter and have an implementation of it for file based output formats and have an OutputCommitterFactory that gives you the right OutputCommitter for the job OutputFormat. Similar to how Serializations are picked.
Errors in commit logic will affect the task only. The framework will be safe, right?
Making OutputFormat an abstract is always advantageous. Adding OutputCommiter sounds good. That could be addressed in a seperate jira, which moves the commit logic to OutputCommitter. Yes on the errrors in commit logic.
I'm still worried about adding methods to the OutputFormat that later will be deprecated as the logic moves to something like an OutputCommitter. I would prefer to leave the OutputFormat as it is now. I am +1 on the approach in the current patch. This patch is an improvement over what exists in terms of performance.
I am in two minds regarding having an OutputCommitter interface esp. for the common uses of OutputFormat. So for example, for the FileOutputFormat case, we have all the logic for committing in the base FileOutputFormat and all the output formats that deal with FileOutputs inherit that. Although the sub-classes could override the commit implementation, most of them don't need to. Alejandro, could you please enumerate some advantages of having a separate committer interface vis-a-vis having the commit implementation in a base class for all related outputformats? My thoughts are along the lines of my first comment on this issue.
Committing an output is not dependent on the OutputFormat type but the storage. Putting it in OutputFormat even if done at the base class FileOutputFormat still implies it belongs to the OutputFormat. If a job has multiple outputs (via MultipleFileOutputFormat, MultipleOutputs or side files) against the same storage it would seem logical to enforce the same commit logic and this should be atomic. Ideally, if a job has multiple outputs (to different storages) this should also be atomic. Even if the storage access is provided by the job (ie a custom FileSystem) it should be some entity close to the storage who providing the output commit logic. Granted some or all of the above comments are far fetched or may be seem unrealistic. So I would (I am) living with them, but from the point of view of concerns correctness I would see a separate interface OutputCommitter providing the commit logic even it it does exactly what the patch does today with none of the above comments. Maybe having a new method in the OutputFormat that gives the OutputCommitter class to use. Alejandro, you can provide different implementations of the various output storage related APIs (including commit), and yet use the same recordwriter. The vice-versa is also true - you could use the same storage and have different recordwriters. In essence, the OutputFormat captures both the storage and the recordwriter to use on that storage. All outputformats against the same storage could use the same commit implementation (the base one) but is there any reason why we should enforce that.
The atomicity, I think, is output format dependent. So for e.g., in the case of FileOutputFormat, assuming maps/reduces are idempotent, if the commit happened partially (1 out of 10 files got promoted), the next attempt of the same task could then either ignore the previously committed file (delete before commit) or accept what has already been committed. Maybe for the dfs, if we provide an atomic rename(tmpdir/<set-of-files>, outputdir) it would handle atomicity. For an outputformat like the SQLOutputFormat, it is a different game. My point is the atomicity across multiple outputs across different storages is hard to guarantee and it should be a per outputformat dependent logic. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386162/patch-3150.txt against trunk revision 677379. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 18 new or modified tests. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 3 new Findbugs warnings. -1 release audit. The applied patch generated 212 release audit warnings (more than the trunk's current 209 warnings). -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2881/testReport/ This message is automatically generated. Here is a patch after fixing the warnings.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386275/patch-3150.txt against trunk revision 677690. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 18 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2892/testReport/ This message is automatically generated. A couple of comments on the latest patch (JUL/17):
1 cleanupTask method The OutputFormat.cleanupTask(TaskAttemptContext taskContext, boolean promote) method name and what the method is supposed to do do not match. It is not intuitive. I would suggest having 2 methods commitTask(TaskAttemptContext ctx) and discardTask(TaskAttemptContext ctx) instead, then it is clear what is the intention when looking at the methods and their usage (a boolean to do exactly the opposite is confusing). 2 New methods in the OutputFormat Instead adding the following 4 to the OutputFormat: + public abstract void setupJob(JobContext context) throws IOException; + public abstract void cleanupJob(JobContext context) throws IOException; + public abstract void setupTask(TaskAttemptContext taskContext) throws IOException; + public abstract void cleanupTask(TaskAttemptContext taskContext, boolean promote) throws IOException; I would put them in a separate abstract class OutputCommitter and add to the OutputFormat a single abstract method OutputCommitter getOutputCommitter(). The FileOutputFormat would implement the getOutputCommitter() method returning a FileOutputCommitter. The Task.done(Umbilical) when taking care of the side files would instantiate a FileOutputCommitter (taking the class from a config property) and do the commit for side files. The pros with approach are:
I think you may be right that we want to have an OutputCommitter, but it should not be determined by the OutputFormat. Rather it should be configured independently. In particular, we can default it to "FileOutputCommitter" with roughly the current semantics. My concern with having the OutputFormat create the OutputCommitter is it makes the api more complex and the application may want to write side files with a non file output format.
I'd propose something like: public abstract class OutputCommitter { public abstract void setupJob(JobContext context) throws IOException; public abstract void commitJob(JobContext context) throws IOException; public abstract void abortJob(JobContext context) throws IOException; public abstract void setupTask(TaskAttemptContext context) throws IOException; public abstract boolean needsTaskCommit(TaskAttemptContext context) throws IOException; public abstract void commitTask(TaskAttemptContext context) throws IOException; public abstract void abortTask(TaskAttemptContext context) throws IOException; } public class FileOutputCommitter extends OutputCommitter { public Path getWorkPath(Path basePath) throws IOException; } public class JobConf { public OutputCommitter getOutputCommitter(); } We need the test for needing commit to optimize the very typical case where there is nothing to commit and thus no point to a round trip from the JobTracker. The FileOutputFormat would check if the OutputCommitter is a FileOutputCommitter and if so, it would use the getWorkPath from it. Thoughts? +1
Yes, that makes sense, it is much simpler then. And we would not add more responsibilities to the OutputFormat, only changing it from interface to abstract class. This sounds good.
But some clarifications: 1. Shouldn't we move the apis public static Path getWorkOutputPath(JobConf conf); static void setWorkOutputPath(JobConf conf, Path outputDir); from FileOutputFormat to FileOutputCommitter and deprecate getWorkOutputPath(JobConf conf) in FileOutputFormat ? 2. Since we do not do commit/abort of output at Job level and we do cleanup of temporary stuff, insteadof apis {commit/abort}Job, we should have cleanupJob.
Owen, do you say that fileOutputCommitter.getWorkPath(basePath) should be called from fileOutputFormat.getRecordWriter() No, I wouldn't. Partially for backwards compatibility and partially because the FileOutputCommitter has a more general interface that takes a path. I suggest:
When a FileOutputFormat is configured, get the output directory and the OutputCommitter. If the OutputCommitter is a FileOutputCommitter, call setWorkOutputPath with the configuration and the result of FileOutputPath.getWorkPath(FileOutputFormat.getOutputPath(conf)). Otherwise, set the work path to the getOutputPath. Here is a patch adding OutputCommitter and FileOutputCommitter. And the commit logic is moved to OutputCommitter now.
Now the apis look like: public class JobContext { public JobConf getJobConf(); } public class TaskAttemptContext { public TaskAttemptID getTaskAttemptID(); public JobConf getJobConf(); public Progressable getProgressable(); } public abstract class OutputCommitter { public abstract void setupJob(JobContext context) throws IOException; public abstract void cleanupJob(JobContext context) throws IOException; public abstract void setupTask(TaskAttemptContext context) throws IOException; public abstract boolean needsTaskCommit(TaskAttemptContext context) throws IOException; public abstract void commitTask(TaskAttemptContext context) throws IOException; public abstract void abortTask(TaskAttemptContext context) throws IOException; } public class FileOutputCommitter extends OutputCommitter { Path getWorkPath(TaskAttemptContext context, Path basePath) throws IOException; } public class JobConf { public OutputCommitter getOutputCommitter(); } public abstract class OutputFormat { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context ) throws IOException; public abstract void checkOutputSpecs(JobContext context) throws IOException; } Submitting for hudson
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386615/patch-3150.txt against trunk revision 678915. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 18 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2927/testReport/ This message is automatically generated. The same patch with one more api added in org.apache.hadoop.mapred.Jobconf :
public void setOutputCommitter(Class<? extends OutputCommitter> theClass) With a javadoc fixed in the earlier patch.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386786/patch-3150.txt against trunk revision 679459. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 18 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2940/testReport/ This message is automatically generated. Changes to do with public interface OutputFormat and signatures of apis in it need not be considered here, because the same changes are done as part of
Here is patch after removing the changes in OutputFormat and FileOutputFormat.
Attached patch with a testcase for FileOutputCommitter apis.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12387257/patch-3150.txt against trunk revision 681243. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2994/testReport/ This message is automatically generated. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12387264/patch-3150.txt against trunk revision 681243. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2996/testReport/ This message is automatically generated. Test failure org.apache.hadoop.hdfs.TestFileCreation.testClientTriggeredLeaseRecovery is not related to the patch.
Patch updated with trunk
Cancelling patch because undoing OutputFormat api changes missed a change in FileOutputFormat
I went through parts of the patch that affects the core protocols. Some comments:
1) The Umbilical protocol doesn't need to have the commitError method. The tasktracker can just kill the task for which the JT returns "discard output" leaving the cleanup of this task's output to the JT (cleanupJob). 2) I don't understand why there is a sendLastUpdate before sendDone. 3) The check for whether to commit needs to just check the existence of the temp output directory. It doesn't need to check the contents of that (since these directories are now created only on demand) and once they exist we can assume that they do require a commit. 4) The TIP should reset the taskId that got marked as "commit-pending" to empty. This is because the TT could be "lost" after the task was marked as commit-pending in the JT. 5) Indentation has to be fixed in JobInProgress.java 6) Overall, some comments are redundant. In addition to the above, the JT should just send a killTaskAction for the tasks that should discard their outputs. That will make the TT killing such tasks seamless (exactly the way task kills are currently handled).
Here is patch after adding the change to do with getWorkPath in FileOutputFormat and incorporating Devaraj's comments.
sendLastUpdate is required before sendDone because sometimes last status update can be missed by communication thread. So, before sending done, we make sure counters are updated by sending a last update. Patch updated with trunk
The API's look right on this. I haven't looked through the implementation details yet. I assume it is the final reduce that does the commit on the job?
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12388017/patch-3150.txt against trunk revision 685406. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3048/console This message is automatically generated. The patch with {setup/cleanup}Job moved to task. Now there is no user code in JT.
Tested the patch. Ran sort benchmark on 500 node cluster. Introduced a sleep of 1second for canCommit polling. But this could reduce performance of small jobs. I will run small job benchmarks and update here. Thanks Ramya for runnning small job benchmark.
The attached patch has everything same as earlier patch, except that setupJob is done by JobClient instead of first task launched. Because, before the first task does a setup job another start could start execution which would make it fail. So, Now JobClient does the setupJob and Final reduce (final map if no reduces) does the cleanpJob. Sorry. Earlier patch had a findbugs warning. Here is patch fixing them.
Cancelling patch to incorporate Devaraj's offline review comments which include:
1. Call setupJob in JobClient just before the submitJob. i.e. after creating splits etc. 2. Add forrest documentation. Here is a patch after incorporating review comments.
+1. Though I should note that this patch currently supports only FileOutputCommitter based "side files". Maybe, if there is a requirement for commit to other types of storages for side files, we could make that configurable too (something like getSideFileCommitter).
There could be a race condition in finding out the final reducer.
For example, the last but one reducer reported as it is COMMIT_PENDING. It will get a CommitTaskAction from Jobtracker. Before the last but one reducer reports as SUCCEEDED, the last reducer goes to COMMIT_PENDING. Since JT notices there is another unsuccessful reducer, the last reducer gets a commit task action, but final attempt flag is not set. So, none of the attempts is sent a commit task action with cleanup Job flag set. Another scenario is, all the reducers are in COMMIT_PENDING at the same time. Then none of them is considered to be final attempt. Solutions: Thoughts? I am +1 for option 2. Hope that is not too complicated. We would ideally like to avoid running any user code at the level of the JT machine itself!
Good catch on the race condition. I think we should do option #2 and launch a new task that does the job level commit.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12388425/patch-3150.txt against trunk revision 687868. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3076/testReport/ This message is automatically generated. Here is patch which makes the job cleanup as separate task at the end of the job.
When all the maps and reduces of the job have finished or the job is killed, cleanup task will be launched. Once the cleanup task completes, job is marked SUCCEDED/FAILED. The patch doesnt update any counters for the cleanup task. And web ui also doesnt show anything about it. We can have a follow up jira which adds a row for cleanup task in the jobdetails. Changes from earlier patch are in JobInProgress, TaskInProgress, MapTask, JobQueueTaskScheduler and LimitTasksPerJobTaskScheduler One more corner case, we need not do setupJob if the number of maps is zero. Since the job is immediately declared as SUCCEDED. The patch does setupJob only when there are splits. The patch also fixes findbugs warnings in JobInProgress.
Canceling patch to incorporate offline review comments from Sharad. The comments include:
1. Remove unused method JobInPorgress.reportCleanupTIP 2. Remove the change in JobInProgress.findFinishedMap, because it is not used anywhere 3. Move the call to method canLaunchCleanupTask inside obtainCleanupTask 4. cleanup indentation in TaskInProgress.java 5. change the signature of method jobComplete 6. Add webui change in this jira itself, since it requires very small code change. Patch incorporating the review comments.
Changes for web ui are in jobdetails.jsp, jobtasks.jsp. cleanupProgress is added in JobStatus. And public api JobTracker.getCleanupTaskReports(JobID jobid) is added test-patch result on trunk:
[exec]
[exec] -1 overall.
[exec]
[exec] +1 @author. The patch does not contain any @author tags.
[exec]
[exec] +1 tests included. The patch appears to include 12 new or modified tests.
[exec]
[exec] -1 javadoc. The javadoc tool appears to have generated 1 warning messages.
[exec]
[exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
[exec]
[exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
[exec]
[exec]
-1 javadoc is due to -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12388840/patch-3150.txt against trunk revision 688936. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3099/console This message is automatically generated. Patch updated with trunk
+1. The changes related to cleanup as a separate task looks good to me.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12388893/patch-3150.txt against trunk revision 689230. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3113/testReport/ This message is automatically generated. -1 javadoc is due to
-1 findbugs is not related to the patch. test-patch on my machine does not give any warning. The warning shown in testReport is not the code change from this patch. TestFailure TestFileCreation.testClientTriggeredLeaseRecovery is not related to the patch. .TestMiniMRDFSSort.testMapReduceSort is due to All the tests passed on my machine. Suggestions:
Another question is how this interacts with
Owen, I think we can consider that for later (for e.g. as part of HADOOP-2560 or as a separate jira altogether). The reason is that even with
This would be a fundamental change to the framework and might affect things like JT restart ( sigh.. the last patch only considers map slots for running the cleanup task. We probably should consider both the types of slots for running the cleanup task (whichever comes first). Long term it may make sense to not constrain the TIP to be just map/reduce but support any type of task. Then things like the cleanupTask will be a natural fit...
Here is patch doing :
1. incorporates Owen's comments. 2. CleanupTask can be reduce or map task. But the job has two clenaup tips one for map, one for reduce. When a task tracker asks for a task with map slot, cleanup mapTip starts an attempt and vice versa. When the one of the cleanup tips is successful, the job will kill the other tip. Thoughts? 3. Earlier patch was not scheduling cleanup task in proper slot, this patch fixes it. 4. Also Fixes a bug in JobInProgress.updateTaskStatus() 5. Removes a redudant log message. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12389170/patch-3150.txt against trunk revision 690418. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3148/testReport/ This message is automatically generated. Some comments on the last patch:
1. Remove the check for tip.isComplete (it is not necessary) for cleanup task in JIP.failedTask 2. Make sure launchedCleanup is set to false if the cleanup task is failed or killed from UI or commandLine or because of lost TT 3. Make sure the cleanupTip.progress is same any other tip progress. 4. Change version number for the JobSubmissionProtocol and InterTrackerProtocol because of JobStatus and TaskStatus changes 5. Change the name of setCleanupJobTask in Task.java to setCleanupTask, because it confusing with TIP.setCleanupTask 6. You should have a error code for the case where you fail to report commitPending to TT or ask for commitapproval from TT. for e.g., error code 67and 68, just like if ping fails, then System.exit(65) is called. 7. In JobQueueTaskScheduler and LimitTasksPerJobScheduler, move the iteration over jobs for cleanup tasks to the top of assignTasks, i.e. before calculating the loads. 8. In JobStatus, call the old constructor from new constructor by passing 0.0f for cleanupProgress. 9. The kill for commit pending tasks that should be killed should be handled in getTasksToKill 10. JIP.updateTaskStatus should update the progress of the job only when it gets the statuses of regular tasks 11. In TaskTrackerStatus, countMapTasks/countReduceTasks should count COMMIT_PENDING tasks too Here is a patch incorporating Devaraj's comments.
The patch has been tested for the following (with speculative execution on): Ran sort benchmark on big cluster. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12389338/patch-3150.txt against trunk revision 691099. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 3 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3160/testReport/ This message is automatically generated. Findbugs says usage of System.exit is bad practice because it shuts down the entire virtual machine. The warning already exists for ping and statusupdate. So, this warning should not be a problem. The other warning for inconsistent synchronization of JobStatus.user is due to existing code, I dont see this warning with test-patch on my machine.
Some comments:
1) The UI should be changed to not have the new row on cleanup task; instead, you could have a link for the current status of the cleanup task (like "Cleanup Pending", "Cleanup Running", etc. and have them linked to the TIP). I am not very comfortable with such a big change in the UI where you have a separate row altogether for the cleanup task. 2) There are some wrong comments in the code. Here is patch doing the following:
1. Changed web UI as suggested by Devaraj 2. Corrected comments in JobQueueTaskScheduler and LimitTasksPerJobTaskScheduler. 3. Call to countMap/ReduceTasks was twice in earlier. Fixed that in the patch. 4. Added getCleanupTaskReports(JobID) to JobSubmissionProtocol. Updated TestMiniMRLocalFS to test getCleanupTaskReports. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12389488/patch-3150.txt against trunk revision 692287. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 15 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3179/console This message is automatically generated. Patch has gone stale
Patch updated with trunk
test-patch result on the trunk:
[exec] -1 overall.
[exec]
[exec] +1 @author. The patch does not contain any @author tags.
[exec]
[exec] +1 tests included. The patch appears to include 15 new or modified tests.
[exec]
[exec] +1 javadoc. The javadoc tool did not generate any warning messages.
[exec]
[exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
[exec]
[exec] -1 findbugs. The patch appears to introduce 2 new Findbugs warnings.
[exec]
[exec]
-1 findbugs is due to System.exit calls. TestQueueManager.verifyJobKill fails, because the jobKill has to wait for cleanup task completion now. I will upload a patch with the fix
Patch updated with trunk and TestQueueManager fixed.
test-patch result on trunk: [exec] -1 overall.
[exec]
[exec] +1 @author. The patch does not contain any @author tags.
[exec]
[exec] +1 tests included. The patch appears to include 18 new or modified tests.
[exec]
[exec] +1 javadoc. The javadoc tool did not generate any warning messages.
[exec]
[exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
[exec]
[exec] -1 findbugs. The patch appears to introduce 2 new Findbugs warnings.
All unit tests except TestKosmosFileSystem passed. I filed I just committed this. Thanks Amareshwari!
sigh there is a problem with the 3150 patch I committed - the FairScheduler.assignTasks has not been modified to take into account the cleanupTask (the task that cleans up the job's temp data, previously used to be done by the JT). I have some thoughts/questions in mind regarding that (that ideally should have been discussed prior to commit)...
Stepping back on cleanupTask, the way it is implemented in this patch is that it could be a map or a reduce task and after the job completes running its regular map/reduce tasks, the cleanup task is run on the first free slot. That is, when a TT comes asking for a task, all the jobs are looked at as to whether the job is ready to have its cleanup task run (look at JobQueueTaskScheduler.assignTasks and JobInProgress.obtainCleanupTask). If so, depending on the type of task we want to assign to the TT, we either give out a map or a reduce cleanup task. Once a cleanup TIP successfully completes we simply kill the other cleanup TIP and mark the job complete. (Pros of the current approach) (Cons of the current approach) One afterthought which comes to my mind after having committed the patch is whether we should give such a special treatment to the cleanup tasks or make it part of the job's regular map/reduce TIPs (that is we inject additional TIPs, one of type Map and another of type Reduce, into the original map/reduce TIPs) so that JobInProgress.obtainNewMapTask/ReduceTask can handle this as special cases (they hand out a cleanup task when the regular tasks of both types are complete). This way schedulers don't have to be bothered about the cleanup task. Depending on the discussion, we can either revert the patch and implement the alternative, or raise a jira for addressing just the FairScheduler.assignTasks. Hi Devaraj,
Thanks for pinging me about this. Here is my thought on this problem: There's only one right way to schedule cleanup tasks, namely as soon as the job finishes, so schedulers should not be aware of this process. The role of a scheduler is to decide which jobs get to run when there are several choices, while initialization, cleanup, etc are responsibilities of the JobTracker. So the solution I'd suggest is to assign cleanup tasks from the JobTracker code itself before it calls the scheduler. That is, if(there are tasks to clean up) { send heartbeat response to launch cleanup task } else { call taskScheduler.assignTasks }. This is clean and reduces the work needed to write a scheduler and the potential bugs that can arise. It's much less hacky than making obtainNewMap/Reduce return these tasks. I'd also think about using a separate type of slot for cleanup tasks, i.e. having them not count towards map and reduce slots. Presumably there will never be a huge backlog of cleanup tasks, and these tasks won't impact performance of running maps and reduces. This will let cleanup tasks run ASAP even when a node is running long maps or reduces. However, if this is complicated to implement, then don't worry about it.
This seems like a good compromise between the various factors involved here.. One thing to note here is that in case the tasktracker has more than one slot free, then those will go vacant (this is true when we have +1 to just having the framework assign the task as soon as possible.
Having a commit task also would make a lot of sense. Thanks guys for a quick consensus on this issue. I will raise a jira to address this issue (and the approach would be to check for and assign the cleanup task before assignTasks is called)
Integrated in Hadoop-trunk #595 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/595/
Filed
Here is brief test plan for the feature.
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
1) The task framework code knows when it is done with the task. If the outputformat is FileOutputFormat, at that point of time it sets the state of itself to COMMIT_PENDING and sends that status out to the tasktracker.
2) The tasktracker forwards this status to the JobTracker and the JobTracker notes it
2.1) sends a COMMITTASKACTION for that task if this is the first such attempt trying to COMMIT
2.2) kills all other task attempts and if two status messages with COMMIT_PENDING come at the same time from two running attempts, the first one wins.
3) The tasktracker gets the COMMITTASKACTION and notes it. The task doesn't commit the output until the tasktracker agrees to it (a new RPC like canCommit())
4) If the commit fails, the task attempt fails. The JobTracker then reexecutes that.