|
[
Permlink
| « Hide
]
Brice Arnould added a comment - 19/May/08 04:08 PM
This new version makes a few more changes in the JobTracker, but allows the JobScheduler to use any container to store jobs (so we can have e.g. per-user queues) and have a lesser asymptotic complexity. It is yet untested.
This sounds like a good approach to me. It might make sense to implement two schedulers as a part of the patch, to validate the abstraction, one that's reproduces current FIFO, and one that, e.g., limits tasks per job (to better timeshare the cluster, HADOOP-2573).
I initially intended to propose the generic interface and some other schedulers as other issues, but I don't mind fixing #2573 -_^
So this new version adds a new FifoScheduler that takes in account a configurable maximum limit on the number of running tasks per job, and an Interface that characterise JobSchedulers. If you want me to, I'll write a proof-of-concept "RackScheduler" that try to group tasks belonging to the same job on low number of racks. So we can ensure that this interface can be used to implements other things than limitations (and in all cases I will work on a more efficient scheduler that takes in account locality). An updated patch that build on trunk
This scheduler is more a proof of a concept for now. It tries to assign to each Job a reduce set of Racks. Appart from that, it is a FifoJobScheduler.
This patch do the TODOs and add the JobTracker as an argument to the constructors of JobSchedulers, so they can have all the informations they might need.
I think that it is now ready. (The proof of concept is now much simpler, thanks to the topology service of bug
I agree that the scheduler is currently excessively difficult to modify and reason about. Something like this could be very useful in scheduler development. I have a couple thought about the details your patch.
Why is the scheduler responsible for managing the task tracker statuses? Shouldn't that stay in JobTracker? Though you'd still need updateTaskTrackerStatus() in the JobScheduler so that the scheduler can do something smart if there's been a change in status. It might be wise to add an initialize() to the JobScheduler interface so that JobSchedulers can be written using only the default constructor. This would make it easier to push the choice of scheduler into a config file; you'd just list a class name and the system could use reflection to load the scheduler and start it. For which structures do you intend getLockOnJobs() to return a lock? Saying that the scheduler "won't move anything" is a little open-ended. I don't see why the scheduler would ever need to write to externally-visible structures.
You're right. My aim was to move inside the JobScheduler the two structures we might want to reorder in order to select tasks in a more efficient way. I told myself that it would be a waste to keep two copies of the same collections (the unordered in the JobTracker, the ordered in the JobScheduler). But on the other hand, it can make the code more verbose on simple algorithms.
The latest version of the patch (v4) already use reflection to load a scheduler according to the configuration file. I'm really unsure about the name I choose for the option however ("mapred.jobtracker.scheduler").
This description really needs some clarification. getLocksOnJobs() give you the insurance that every operation on internally stored jobs, namely addJob, removeJob and assignTask, will wait for you to release the lock before to alter the list. It's intended use is that a user of this class intending to alter a Job do it in that way : Thanks for your comments, I'm going to post soon a new version of this patch that will take them in account. Please tell me if you see other errors in that one. PS: Please excuse me for my English. If I said something misplaced or impolite, it is not by intention. A new version addressing issues raised by Ari Rabkin, and fix another bug that would have occured if a subclass redefined getLockOnJobs().
I'd like to mention one other thing that it might be useful to think about: Letting the pluggable scheduler control speculative execution. Currently, the scheduler asks each job for a map or reduce task using obtainNewMapTask or obtainNewReduceTask. These may return a speculative task based on some fixed thresholds. It would make more sense to have the JobInProgress expose only a list of task statuses, and let the scheduler decide which ones to speculate. In effect, move obtainNewMapTask and obtainNewReduceTask into the scheduler (have them be utility methods in AbstractJobScheduler). This would consolidate all scheduling code into one class, making it easier to understand and modify. It's fairly clear that there's no "one size fits all" solution for speculation, and users often disable it, change the thresholds, etc, so it makes sense to make the entire speculation algorithm pluggable.
This is not a critical problem because a JobScheduler implementation can choose to not call obtainNewMapTask / obtainNewReduceTask and just look at the JobInProgress and select tasks by itself. It's just something that would make the code more understandable by consolidating scheduling decisions in one place. We might also consider making the speculative execution scheduler a separate pluggable object, so that multiple JobSchedulers can reuse it. Thus we might create a TaskScheduler interface (which would just contain obtainNewMapTask() and obtainNewReduceTask()), and a default implementation that uses the current threshold-based algorithm. AbstractJobScheduler could then look up which TaskScheduler to use in the config file. However, the most important thing is to make sure that the JobScheduler has a narrow interface (addJob, removeJob and assignTask), because then people can write this type of thing behind it. This fix, together with
This new proposition (v6) factorises more things in the AbstractJobScheduler.
Is there other issues that prevent this patch from being merged ? I'm willing to do the required changes.
I think we need more time to review this. It looks like good, but I don't think we can get it properly review this afternoon. I'm pushing it to 0.19.
I don't think that in java 1.5 the @override annotation is supported for use with methods which are defined in an interface which the class implements. It seems that this behavior is supported in java 1.6, but in 1.5 @override is supposed to be used only when you override a method that is defined in your superclass. Thus when i run ant with this patch applied, i get errors. Everything still needs to compile with 1.5 as of now, so I guess we should remove these @override annotations.
You're right, here is the updated patch.
It also updates the paths and some gramatical errors in the documentation. This looks good. A few comments:
1. You can change AbstractJobScheduler and FifoJobScheduler's constructors to be no-arg constructors. The JobTracker isn't being used, and if you make AbstractJobScheduler implement Configurable, the the Configuration will be automatically called if you use ReflectionUtils#newInstance (this is the standard Hadoop pattern). 2. This needs some unit tests. It should be possible to write a unit test for FifoJobScheduler since it is relatively isolated. 3. In AbstractJobScheduler#assignTask you move to the next step if the following condition is true: (step == 0 || step == 2) && mapTasksNumber > maximumMapLoad
|| (step == 1 || step == 3) && reduceTasksNumber > maximumReduceLoad
Should these checks be >= (greater than or equal to) so the maximum load isn't exceeded? 4. Rather than having AbstractJobScheduler manage maxTasksPerJob, it might be better to have a subclass of FifoJobScheduler that limits tasks per job (TaskLimitedFifoJobScheduler). Or if this is possible through composition rather than inheritance then even better. Then FifoJobScheduler would then just preserve the current behaviour. Tom White :
I started using that pattern (up to v4), but I told myself that it could be nice to give to the scheduler an access to the topology. It's true however that the FifoJobScheduler do not use it, so I can remove it if you want.
A really good idea ! I found a bug writing the tests :-P (that bug occured when two job of the same priority were added at the same time).
Ooops My argument in favor of keeping that in the AbstractJobScheduler is that, when I implemented other schedulers to test the API (just before v6), this part enforcing limitations was common to all of them. The reason is that those options keeps their meaning with all schedulers. However if the point is that the current behaviour should still be the default, I completely agree. For sure I have nothing against making the modifications you propose in 1) and 4), but I wanted to be sure you had considered the reasons that made me choose the current organisation. PS: Another example of a scheduler.
Takes in account changes from
Your patches need to be svn diffs relative to HADOOP_HOME.
I would prefer the use of ReflectionUtils.newInstance, because it handles the case of non-public classes and constructors and does the configuration in a standard way. It would mean moving your parameters from the constructor to an initialize method. Why do you have "// TODO: Java 6 @Override" all over? @Override is in Java 5. Generally with log4j, we don't provide loggers to other classes, but rather let them get their own loggers. So I'd remove getLog() The interface for AbstractJobScheduler has too many ways to update the task tracker state. I'd suggest just leaving updateTaskTrackerStatus, which can do deletes if the new status is null. In particular, I'd remove the two removeTaskTracker methods. Owen O'Malley
Ooops. Sorry for that. I think that I fixed it.
Done ! (I'll open another issue to discuss of ReflectionUtils)
Andy Konwinski explained me that in Java 5 this annotation was not supported on methods defined in an interface.
Done !
My aims was to allow calls to the jobscheduler from unsynchronized methods, to help HADOOP-3608 ... too much anticipatory Thanks for your comments ^^
Thinking about this some more, perhaps a better way of achieving this would be to make things more composable. How about breaking apart the responsibilities, so there is a class for choosing the next task to run - call it TaskScheduler (which has the functionality of AbstractJobScheduler), and a class for prioritizing jobs: JobQueue (which is basically FifoJobScheduler). This would permit mixing of different JobQueues with different TaskSchedulers. This would not preclude more complex TaskSchedulers that have their own specialized JobQueue when it's not possible to sort jobs before choosing a task. public abstract class JobQueue implements Iterable<JobInProgress> { public abstract void add(JobInProgress job); public abstract Iterator<JobInProgress> iterator(); public abstract void remove(JobInProgress job); } public abstract class TaskScheduler extends Configured { private JobQueue jobQueue; public TaskScheduler() { // jobQueue specified by configuration } /** * For subclasses that want to provide their own JobQueue */ protected TaskScheduler(JobQueue jobQueue) { this.jobQueue = jobQueue; } public void setConf(Configuration conf) { super.setConf(conf); // initialize jobQueue here } public JobQueue getJobQueue() { return jobQueue; } public abstract Task assignTask(String taskTrackerName) throws IOException; public abstract boolean updateTaskTrackerStatus(String taskTrackerName, TaskTrackerStatus newStatus); // other methods... } Then I would have FifoJobQueue, and two TaskSchedulers: DefaultTaskScheduler (which implements the current behaviour) and JobLimitedTaskScheduler both using FifoJobQueue by default. Thoughts? A couple of other points:
Tom White
That seems to be a really great idea ! I'm going to rewrite my patch the way you suggest. Brice,
This is looking good. A few comments:
> I think JobQueue and TaskScheduler should be abstract classes [ ... ]
+1 (Thanks for watching for this, Tom!) A good rule of thumb is that if it has more than one method it should probably be an abstract class, not an interface. Even then, interfaces aren't always best even for one-method APIs. Tom White
You mean that interfaces are easier to make evolve because they can provide default implementations to the methods we will add ?
getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it.
Not a problem to add it, but I see two way of doing so without duplicating most of assignTask() :
Both are easy to implement but that new level of abstraction seems contrary to the KISS principle, except if we really need it for other filters. For now, when limits are disabled, the TaskScheduler just do one more test line 104, and one other line 124 when limits are enabled. That might not justify the creation of another class and a filter concept (again : excepted if we need them for something else). I fixed the warnings. Thanks for your advices. It's instructive ^^ While the idea of splitting up the TaskScheduler and JobQueue is
great, isn't having a getJobQueue() in TaskScheduler needlessly complicating the interface? Now, when somebody wants to write a new scheduling algorithm, they may have to write two classes, a TaskScheduler and a JobQueue, and figure out how they'll interact. I'd suggest having the add(JobInProgress) and remove(JobInProgress) methods directly in the TaskScheduler, and removing getJobQueue(), so that the JobTracker only calls methods of the TaskScheduler object. The DefaultTaskScheduler can include a JobQueue that it passes the add/ remove methods to, but people writing new schedulers should have the option to do something different. Here are two concrete cases where this would be useful:
Matei Matei Zaharia
Hi ! It's late in France, so please excuse me if what I say is more stupid than usually :-P But I really like that separation between the TaskScheduler and the JobQueue. A lot of characteristics are shared by all tasks of a Job : the userName, the priority, the typical resources usage of composing tasks... Things like that. Most admins will want to use this to reflect the "politic" of the cluster (eg. should it give the priority to users that pay more, balance the charge among users, minimize the maximum time for job completions and so on). Writing a local JobQueue to reflect a local policy should be easy and should not interfere with the "optimizing part" of the scheduling (represented by the TaskScheduler). I provided a few example that use JobConf to easily attach tags to jobs, allowing an user to describe it's need. With the new interface that should be even shorter. The TaskScheduler, at the contrary, would be more technical and would take in account things like locality or bandwidth, independently of the site policies (OK, mapred.jobtracker.scheduler.maxRunningTasksPerJob is an exception... maybe I should move it into the JobQueue ? :-/) That being said, if you think that their will be a need for Task level scheduling, I can make JobQueue an interface again. In that way, a TaskScheduler could be it's own JobQueue by returning self in the getJobQueue() method. It's a bit more complex but that's a special case. Regards, Hi Brice, You're right that the JobQueue abstraction is very nice and that a lot The reason I bring this up is because I've been trying to use your Matei Thanks for your very detailed answer.
I have nothing about adding a superclass, if it is needed. I think that this class would look like the JobScheduler of v6.4 . However, I'm unsure about whether I understand your use case or not.
If my understanding is correct, those two functionalities are independent and should not be glued in a single class (eg. you might want to limit the number of per-user tasks even if the JobQueue is a simple Fifo). However, you would benefit of the JobFilter discussed above to ease the implementation of your TaskScheduler. (It's also true that if the jobqueue is a roundrobin between users, an optimized JobFilter could be written) Do you have a draft of the scheduler you're talking about (implemented for example on top of v6.4) ? Thanks for your patience, but I want to be sure that I understand the problem
I think this is a good point. So I would support the suggestion of having methods to add, remove and iterate over jobs on the TaskScheduler. And keep JobQueue, but don't make it mandatory for TaskSchedulers to use it. Another thing I would like to see is the code for managing the tasktracker state being pushed up into the base class, since it is shared. (In fact, I wondered whether it belongs in the TaskScheduler at all, since it would be possible to make TaskScheduler a listener with only the updateTaskTrackerStatus method. But this would duplicate state with the JobTracker, so on balance it's fine to have it in the TaskScheduler.) Taking these points together, we would then have public abstract class TaskScheduler extends Configured { // abstract methods: public abstract void addJob(JobInProgress jobInProgress); public abstract void removeJob(JobInProgress jobInProgress); public abstract Iterator<JobInProgress> getJobsIterator(); public abstract Task assignTask(String taskTrackerName) throws IOException; // base implementations (currently in DefaultTaskScheduler) public Collection<TaskTrackerStatus> getAllTaskTrackers() { } public int getNumberOfTaskTrackers() { } public TaskTrackerStatus getTaskTracker(String taskTrackerName) { } public boolean updateTaskTrackerStatus(String taskTrackerName, TaskTrackerStatus newStatus) { } public Statistics getStatistics() { } Collection<JobInProgress> retireOldJobs (long retireBefore) { } } public class DefaultTaskScheduler extends TaskScheduler { private JobQueue jobQueue; @Override public void addJob(JobInProgress jobInProgress) { jobQueue.add(jobInProgress); } @Override public Iterator<JobInProgress> getJobsIterator() { return jobQueue.iterator(); } @Override public void removeJob(JobInProgress jobInProgress) { jobQueue.remove(jobInProgress); } @Override public Task assignTask(String taskTrackerName) throws IOException { // as before } } How does this look? (Note that TaskScheduler can extend Configured rather than implementing Configurable.) Also, retireOldJobs seems a bit out of place here and should really go back in JobTracker. This should be easy since it just calls getJobsIterator.
But the iterator returned by iterator() can be any iterator - so we can make it the same one returned by getSortedJobs(). In other words, we only need one way of iterating over the JobQueue. Tom - I like your new TaskScheduler API. I also like the idea of putting the statistics and tasktracker status stuff back into JobTracker, and making the TaskScheduler a listener only. I don't think this needs to duplicate state at all - the JobTracker can expose the getAllTaskTrackers() and getTaskTracker(name) through a public API, and then you're set. (Well, you'll also need the TaskScheduler to have a reference to the JobTracker; maybe give it a setJobTracker method that the JobTracker calls after setConf.) This makes more sense to me than putting this functionality in all TaskSchedulers, especially because the statistics stuff is something where there is only one desired answer (nobody would ever make their TaskScheduler return something else).
Brice - I haven't implemented my scheduler yet, but I will post when I have it. Overall it will be similar to the proposal in
This TaskScheduler is almost identical to what the JobScheduler was, so it will work. I'm still a bit bothered by the fact this modification to the API is not motivated by the example of some real code. But since you both seems to agree it's probably the right thing to do
Ooops :-° You're right. I initally intended to handle this as part of [HADOOP-3609] but I should also have reverted the creation of retireOldJobs in this branch.
Some implementation may not be backed by a single sorted container. Iterating on their jobs (for example when retiring jobs) should be possible without requiring them to create a single sorted container of all their jobs. Moreover, as I said before, the removal of getSortedJobs(trackerName) implies to loose the hability to bias the choice of the job by the characteristics of the TaskTracker.
Another concern with that approch is the complexity of the JobTracker.
Thanks ! I think that writing code is the best way to evaluate an API.
I still have to understand this bug . I'll wait for your code. The TaskScheduler is not the only user of statistics right now - for example, the web interface (JSP pages) also use them. So it makes very little sense to put them into the TaskScheduler and then have the JobTracker pass them from the TaskScheduler onto other users. If the concern is that the JobTracker class is too complicated, then we can move statistics into a separate class, but that should be a separate JIRA. For now, what's wrong with leaving them in the JobTracker and just providing an API that the TaskScheduler can use to read them? It would make this patch simpler, making it easier for it to get tested and accepted, and it's also the right thing from a software engineering point of view (scheduling has little to do with keeping track of statistics). Originally I thought that you'd put stats into the TaskScheduler because it needed to do something very complicated with them, but now I think that a simple getTaskTrackerStatuses method in the JobTracker would work fine.
@Matei Zaharia
While I'm looking at this code again, I think that TaskScheduler.Statistics was a bad idea, since it essentially duplicates mapred.ClustersStatus. So I propose to remove the mapred.TaskScheduler.Statistics and to make TaskScheduler.getStatistics() to return directly a mapred.ClusterStatus. What do you think of that ? PS: By the way, please excuse me if I've been rude by inadvertance. I'm bad in English and choosing the wrong word, or forgeting another, can change the tone of a sentense :-P This new version try to take in account the previous comments.
It also extracts the handling of tasktracker into a seperate class called TaskTrackerContainer. So, we now have : If I forgot something, please tell me ! Hi Folks,
It looks to me like we are heading for a major merge headache on this patch vs some of the other major scheduler work in flight for 19. I'm surprised it has taken this long for us to realize this. Could I ask that folks contributing to this issues' patches state what their goals are here? We're going to need to look for compromise, so it will be good to understand what short term impact they want from this work. We'll be listing conflicting JIRAs and the yahoo teams goals here too. Just wanted to give folks a heads up and start the discussion. With any luck we will quickly find a way to meet everyones goals with a minimum amount of conflict. E14 It is unfortunate that this likely conflicts with
I should add that I'm also working on a Hadoop job scheduler at Facebook. The goal of my scheduler is to implement fair sharing between jobs, so that short jobs are not starved by long jobs, while allowing for the same kinds of per-group quotas that As it currently stands this patch does look like a big change. I agree with Matei that we can scale down the impact of the changes and still meet the goal of having a simple pluggable scheduling mechanism. I've actually reworked Brice's latest patch to do this, so I hope we'll have something with minimal changes to the JobTracker (and hence more acceptable to 3445) that we can commit soon. I'm just finishing off the unit tests and will post the patch later today.
Yes that conflict is trully regretable :-/
If it can help, I can try to port By the way : it's true that this patch makes a lot of changes, but I don't think there will be a significant effort to merge
Here is a new patch which takes Brice's good work and strips it back to minimise the changes to JobTracker to provide a start for future work on scheduling, such as
TaskScheduler has the three methods Matei mentioned: addJob, removeJob, assignTask. It also has a reference to the JobTracker, via a TaskTrackerManager interface, which is like TaskTrackerContainer in the last patch and which I introduced to permit testing. (I've put a comment in it to say as much.) I've renamed DefaultTaskScheduler to JobQueueTaskScheduler to better describe what it does, and I've replaced the JobQueue abstraction with a SortedSet<JobInProgress>. My feeling is that a SortedSet is sufficient to do job prioritization, since you can specify a Comparator to impose an order on the queue. More complex schedulers can just implement the three TaskScheduler methods, which shouldn't be too much of a burden. This is an area where I imagine we can make improvements after this change has been committed. Finally, I've enhanced the unit test to test several scheduling scenarios - this test directly tests the scheduling algorithm by providing a (static) mock TaskTrackerManager implementation. Nice job, Tom. It's a lot cleaner and will make it easier for us to merge stuff from
A Scheduler is usually just an algorithm for deciding which task to pick for a TT. It uses information from the TT (its hostname, rack, what it's currently running, how many resources it has free), and information from what is required (which job/queue to look at, capacities, user limits) and makes the best match. I'm wondering if TaskScheduler should handle addition/removal of jobs. As we make the JT persistent, we need the ability to persist job state to disk, to initialize jobs (expand their task-based structures, as in JobInProgress::Init()) dynamically (since, in order to scale, you don't want jobs to be expanded unless absolutely needed), to store only relevant information in memory and the rest on disk. Something else should likely do this, not TaskScheduler. TaskScheduler needs to access the collection of jobs when it runs its scheduling algorithms, but it should not be responsible for them. Methods like addJob() and removeJob() probably belong to some other class, something like a JobQueueManager. Which, by the way, can also handle multiple queues of jobs, as we'll need for 3445. Maybe the JT itself can handle the queues of jobs initially. Regardless, do you think TaskScheduler should be responsible for jobs? Another thought I had is regarding the work we're doing for 3445. It's more of an observation than a suggestion. HADOOP-3421 introduces multiple queues, capacity per queue, and user limits. Each of these features affects the scheduling of tasks, which likely would go something like this: a TT indicates that it has one or more free Map or Reduce slots. JT figures out whether to look for a Map or Reduce task. JT needs to find a task in a job in a queue, so it first looks at which queue to consider (primarily based on queue capacities and whether the queue can accept a TT slot). Within the selected queue, the JT considers which job to look at, based on user limits (the job needs to be belong to a user who is not using more capacity than he/she is allowed) and priorities. Finally, within a job, the JT (actually, the JobInProgress object) needs to pick what task to run, based on data locality, speculation, and some other heuristics. My guess is that many developers will want to plug in tweaks to some of the pieces of entire scheduling algorithm, and not modify other logic. Someone, for example, may want to tweak how a queue is chosen, and not touch the other stuff. For this, IMO, we need to break down the JT's scheduling flow (decide on M or R task, then pick a queue, then pick a job, then pick a task), which now sits in JobQueueTaskScheduler, into discrete units and allow folks to override one or more of these units. There are a few ways to do this and we can use some of the suggestions and principles applied in this patch there as well. I guess I'm really making a plug here for folks to look at extending the stuff for 3445 (once it is ready) in a similar way, so that the entire scheduling flow can be made extensible at different steps. Once again, I don't think merging the 3445 work with this patch should be very hard, especially with the latest patch. I'll take a look at that soon. Thanks, Brice for your offer to help. I'm sure you can help us out there. And of course, I'm glad you took this whole effort up and pushed it all the way to where it is. Nice work. Tom, the new patch looks good. The fake container for unit testing will be very helpful.
Vivek, regarding job persistence - I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. The JobTracker still keeps a list of jobs in the jobs variable, so it is the ultimate "owner" of the job list. Thus it should be possible to persist the jobs in the JobTracker or JobQueueManager or some other class and just add/remove them from the scheduler when they become schedulable. I've submitted a JIRA about a fair scheduler now -
This new version have far greater tests than my hacks and is much less intrusive (with the drawbacks mentioned before).
Thanks Tom for writing this ^^ My two cents :
@Vivek Thanks, happy if I helped :-D Contributing to Hadoop is very instructive (and fun too :-P) @Matei:
>> Vivek, regarding job persistence - I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. The JobTracker still keeps a list of jobs in the jobs variable, so it is the ultimate "owner" of the job list. Thus it should be possible to persist the jobs in the JobTracker or JobQueueManager or some other class and just add/remove them from the scheduler when they become schedulable. My point was, the Scheduler should get whatever jobs it needs to consider from someone who manages jobs. It shouldn't maintain a separate list. Suppose you have one Scheduler that wants to look at all submitted jobs before deciding which task is best. Suppose you have another that only wants to look at the job with the highest priority and pick a task from it. In the first case, the caller (JT) needs to invoke the Scheduler's addJob() method for every job. In the second case, it needs to invoke the Scheduler's addJob() method only for one job. This is not good. The caller's code should be the same regardless of which scheduler is used behind the scenes. What should really happen is that when the first scheduler is called, it looks at all the jobs by fetching them from a JobManager or whatever class it is that handles jobs. The second scheduler will call the Jobmanager in a different way. The scheduler user's code is not affected. You shouldn't tell the Scheduler what jobs to consider - that decision is part of the Scheduler's internals. A new patch with a few small changes.
> I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. To emphasise this I have renamed these methods to be consistent with the "listener style" often found in Java: jobAdded and jobRemoved. I've also added a jobUpdated method since the job is actually being updated rather than added then removed. > Maybe the TaskTrackerManager interface could be package-private, since it is not meant to be used by end-users (if I understood well). I didn't make it package-private as I wanted implementors of TaskScheduler (which shouldn't have to be in the same package) to be able to use it. However, I've just noticed that Task, JobInProgress and TaskTrackerStatus are package-private, so I have made TaskTrackerManager the same. I suggest we figure out how to allow TaskScheduler implementations to be in other packages as a separate issue, where we consider how much of Task, JobInProgress and TaskTrackerStatus we need to make public. This can be done in parallel with > I suggest that subclasses could specify a Set<JobInProgress> I agree - I've done this. I've also fixed up some badly named TaskTrackerManager instances which still had the word "Container" in them. I wanted to add some more detail to my previous comment, and also address Brice's last comment on implementing queues.
I earlier talked of a JobQueueManager, which is responsible for maintaining the collection of jobs submitted to a JT. This class would deal with how jobs are stored in memory or on disk. It really would encapsulate the jobsByPriority and jobs (perhaps) data structures, which are currently in JobTracker. When a job is submitted, i.e., when JobTracker.submitJob() is called, the JobInProgress object representing the new job would be given to JobQueueManager to maintain. addJob() and removeJob(), or their equivalent, would be part of JobQueueManager. Whatever class derives from TaskScheduler would invoke methods in JobQueueManager to figure out what jobs to look at. It's a bit tricky as to what JobQueueManager should look like. HADOOP-3421 brings in the concept of queues, and jobs being submitted to queues. So you will have multiple queues, each queue containing jobs, and perhaps supporting priorities and limits and such. Should JobQueueManager have an explicit notion of queue names, so that, for example, you could get sorted jobs from a single queue? Or maybe you get sorted jobs from a collection of queues. For example, you could have a method in JobQueueManager as follows: Collection<JobInProgress> getJobs(String queueName, Comparator<JobInProgress> jobComparator) I don't want to get into too many details here. These probably belong more on 3445. My point is, when designing JobQueueManager, or when modelling the right interface for JobQueue, as Brice was talking about, you need to keep in mind that there could be multiple queues of jobs, and sometimes you may want to get jobs from a single queue, and sometimes from multiple queues. Again, why does this matter?
Yet another alternative is to leave TaskScheduler as is, wait till we have a patch for 3445, then look at if/how we design JobQueueManager. It may end up changing TaskScheduler by moving addJob() and removeJob(), but at least you'd have a much better idea of what you want in JobQueueManager. I personally prefer this approach, but keep in mind, it may end up changing TaskScheduler. Tom - I ran into a few issues applying your 9.1 patch, which I've fixed in a new patch (9.2):
@Vivek - I'm still not sure I understand why jobAdded and jobRemoved should not be in the TaskScheduler. It's true that persistence of jobs should be managed by the JobQueueManager, but these methods are meant to be "listeners", to let the scheduler update its internal data structures when jobs are added or removed. Without these methods, the scheduler would have to scan the list of jobs in the JobQueueManager every time it needs to make a scheduling decision to see whether it has changed from last time. There's no serious problem with having to do this, but it seems a little clunky. Or is there a problem with the TaskScheduler even being able to access the full list of runnable jobs?
Java 6 is required now. See
Yes, I mentioned this above. I'm not comfortable with just making all of these classes public without thinking through the interfaces, since we have to maintain these public interfaces, and be careful (and backwards compatible) with evolution. So I suggest we keep them package private for the first release, and figure how to open it up later. I'd be interested to hear others' thoughts about this. Brice/Tom, this is looking good. Couple of brief comments - pardon me jumping in late, I've only recently started looking at this and other related jiras.
1. Let me get the minor one out of way first: can we call the main scheduler interface 'Scheduler' rather than 'TaskScheduler', it might be confusing vis-a-vis JobScheduler? smile Ok, the serious stuff: 2. I propose we update TaskScheduler.assignTask to reflect that a TaskTracker might have multiple slots free ( public List<Task> assignTasks(TaskTrackerId taskTracker); Oh, this might be good time to introduce a notion of TaskTrackerId similar to what Enis did for Job/Task/TaskAttempt ( 3. The one major comment I had is to help quickly resolve the gap between this and public void jobAdded(QueueId, Job); public void jobRemoved(QueueId, Job); public void jobUpdated(QueueId, Job); With this, it will be pave the way for 4. Also, we'd need some query capabilities given the notion of Queues: List<Queue> getQueues(); List<Job> getJobs(QueueId, State); // State is RUNNING/PENDING/COMPLETE
I'm inclined to go with Tom on keep these interfaces package-private for the first release, but I do realise Matei and others might be eager to run with it right-away! Thanks for your comments Arun. Regarding queues, why not have the queue associated with the job (as a jobconf parameter, accessed through a method job.getQueueId()) rather than passing it along separately? Also, the scheduler does not need to keep track of which jobs are in which queue; this can be done by the JobTracker to begin with and the JobQueueManager later on. I'm just thinking that the more baggage there is in the Scheduler interface, the more trouble other implementers will have to go to, and the more temptation there will be to add more parameters and methods in the future, which would break schedulers. Really the only functional thing the scheduler does is to assign tasks.
My idea was to explicitly introduce the notion of the Queue right-away into the Scheduler, making it a first-class citizen - we need to discuss whether this is the right call. Given that both I agree that most schedulers will probably use queues. However, I think that if the queue is a first-class concept, we should make it a property of the JobInProgress rather than a separate parameter, the same way we do with priority, user, etc. This is more consistent and also sets an example for people who want to introduce other per-job scheduling parameters. (In fact I imagine that most people will put per-job parameters as properties in each job's jobconf, so they don't have to modify any Hadoop code to run their scheduler). It's mostly a software design point - if two objects will be associated together, this should be represented as membership.
I agree with Matei. Queues are associated with jobs and are maintained and exposed by the JobInProgress object (Arun, see the patch for 3445 - this is exactly what we've done). I also agree with Matei that you want to keep the Scheduler interface as simple as possible and really just have it assign tasks. In fact, to reiterate, I think the design for this Jira has come out quite well and there is no additional need, at least none that I can see yet, to make things easier for 3445. Once this patch is finalized/committed, we'll change 3445 to fit in with this structure.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12385892/JobScheduler-v9.2.patch against trunk revision 676069. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. -1 javac. The applied patch generated 521 javac compiler warnings (more than the trunk's current 520 warnings). +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/testReport/ This message is automatically generated. @Matei:
After some thought, I'm unable to convincingly argue, even to myself, for the removal of the methods from TaskScheduler. My concern really was with state. On one hand, I see the Scheduler as a stateless algorithm. The information it needs about jobs, when it runs, it gets from some other class. I was worried about any class that extends TaskScheduler having to maintain its own data structures for jobs, while a class like JobQueueManager is also maintaining (similar?) structures. On the other hand, I see your point too - for efficiency, a scheduler may want to know about what's changed since it ran last, rather than look at the entire set of jobs each time. A scheduler can certainly cache what information it needs (and maybe even support listener methods as you've suggested) if performance becomes an issue, but there is a state that it imposes on the system - it orders jobs a certain way (one scheduler may order jobs in FIFO order, another may choose a different ordering) - and perhaps this state is inherent to the scheduler. Like I said, I can't see a very strong reason for removing the job methods from TaskScheduler, so let's leave them there. One comment from Arun was missed possibly.
public List<Task> assignTasks(TaskTrackerId taskTracker);
I think this is good to have in the scheduler. It is easy to default the implementation to return only 1 task as it does today. But leaves the options open with we get to Other than that, +1 for leaving queues out of the interface, and leaving the job state change methods in the interface. Side note: in the current implementation of getNewTaskForTaskTracker(), we use a MIN_CLUSTER_SIZE_FOR_PADDING to give space for speculative and failed tasks. In the patch, with a default value of the MAX_TASKS_PER_JOB_PROPERTY (which is Long.MAX_VALUE), JobQueueTaskScheduler seems to not have this notion of padding. Is this not required, if we desire that, out of the box, the scheduling works as it does in the current JobTracker ?
HADOOP-3343 was opened to address this.
Makes sense Regarding the padding, i am thinking that it makes sense to not have it at all. In general, we must execute failed tasks first always. This will ensure that jobs with deterministic task failures fail really early in the game. But this is more of a change in the JobInProgress class. Regarding speculative tasks the obtainNewMapTask/obtainNewReduceTask could return a speculative task if they desire so. And this logic better be in the JobInProgress class that has all the tasks' information. A new patch (v10) with the following changes:
I think that:
To accomplish that:
class JobDescription {
JobID getJobID();
String getQueueName();
void setQueueName(String queue);
Priority getPriority();
void setPriority(Priority priority);
Path getJobConfPath();
}
class TaskScheduler {
void addJob(JobDescription job) throws IOException;
void removeJob(JobDescription job) throws IOException;
void updateJob(JobDescription job) throws IOException;
List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;
List<String> getQueueNames();
List<JobDescription> getJobs(String queue) throws IOException;
}
Then the only data structure holding jobs is in the scheduler and doing queries can be done through this api. Thoughts? This is a good idea, but there needs to be a way to tell the scheduler
"load and start this job" and to get a JobInProgress from the JobDescription if the job has been loaded. Otherwise there will be no way to find a Task to return in assignTask(). Other useful fields in the JobInProgress include number of maps/reduces desired, number of maps/reduces running, and the list of running tasks in the job (which can be used for deciding when to do speculative execution).
Isn't there a far easier way to do this? A JobInProgress object grows in size when you call JobInProgress.initTasks(). The current JT code calls initTasks() when a job is submitted to the JT (the newly created JobInProgress object is actually put in a queue and some other thread calls initTasks(), but it's really called as early as possible). A simple fix to this is to call initTasks() only when a job is considered for running by the scheduler. That way, you initialize a JobInProgress object only when needed. Otherwise, its memory footprint is low. It's even worth arguing that a newly created JobInProgress object should look a lot like what JobDescription looks like. It's only when you 'initialize' it, at the point when the (first task in the) job can be considered for running, do you need to expand all the other data structures. This seems, IMO, to be a better way to handle scale than have another class. To be fair, JobDescription is really what the scheduler should be looking at, but it makes more sense if the Scheduler is a separate component/process. Otherwise, you're duplicating state in JobDescription and JobInProgress. You could also refactor JobInProgress so that it has a JobDescription member variable which it exposes, rather than expose separate methods for getting/setting priority or queue names, but there doesn't seem to be an advantage to it, other that conceptually encapsulating information that a Scheduler might need in one class. As to whether queue names need to be part of TaskScheduler: we have two options here.
Do we want the Scheduler to serve queries? In the future, you may well want to think of the Scheduler as just an algorithm that, given the state of the system, only decides what task to give to a TT. Web serving may be done through a completely different component. I really think some other component besides the Scheduler needs to be responsible for storing jobs and maintaining data structures that associate the job with queues and deal with job persistence - everything to do with keeping track of jobs & queues in memory. Different schedulers impose different filters/sorting on these structures - they're really just algorithms that access these data structures. Schedulers may keep other data structures for their use. For example, in You may actually want two separate interfaces - one for Scheduling (which will be similar to what TaskScheduler exposes) and one for iterating through jobs and queues. For performance sake, you may have the same class implement both, but they are two separate interfaces. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12385980/JobScheduler-v10.patch against trunk revision 676772. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/testReport/ This message is automatically generated. Fair point about the JobInProgress being fine for the API, provided that the scheduler is required to call initTasks on the JobInProgress when it should be loaded.
On the other hand, we need
So how does this look: class JobInProgress {
...
String getQueueName();
Priority getPriority();
void initTasks();
}
class TaskScheduler {
void addJob(JobInProgress job) throws IOException;
void removeJob(JobInProgress job) throws IOException;
// the job has changed state
void updateJob(Progress job) throws IOException;
// the task (ie. map 0) has changed state
void updateTask(TaskInProgress tip) throws IOException;
// get a set of tasks for the given tracker
List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;
// get all of the queue names
List<String> getQueueNames();
// get an ordered list of the jobs in the given queue
List<JobDescription> getJobs(String queue) throws IOException;
}
I don't think we need updateTaskAttempt, because I can't see anything that a potential scheduler would do with that fine of information. I think that you are right that we want the JobTracker to keep track of the running jobs and tasks, effectively owning the JobInProgress, TaskInProgress, and Tasks and updating their state based on the task tracker reports. In the medium term (ie. not this patch), this should only be for running and pending jobs. Any finished jobs will need to be queried via JobHistory or a similar interface.
What other pieces of code needs to use order within a queue except the scheduler? In fact, I think "order within a queue" doesn't even make sense for all schedulers. Some schedulers, such as the fair scheduler in Similarly, I don't understand what getQueueNames() would be used for by pieces of code external to the scheduler. In fact, if the list of jobs is stored in the JobTracker (which I think we've said it should be), then the JobTracker has all the information it needs to figure out the queue names - it can scan through all jobs and get the queue name of each one from its getQueueName(). Basically the scheduler should be a box whose only "output" is to assign a task, and whose only "input" is whatever state of the world it needs to know about in order to do its job (i.e. events for jobs being modified, etc). That makes it possible to separate the code that persists jobs from the code that schedules them. The users of the getQueueNames() and getJobs(queue) would be the web ui and the command line tools "bin/hadoop job". Of course the ordering doesn't have to be exclusive, but almost all schedulers will have a rough priority within the queue. It is basically about providing information about the scheduler to the user. Does that make sense?
I see, I guess it's an issue of increasing API complexity vs benefit of seeing jobs in sorted order then. Personally I'd prefer if the API that the scheduler had to implement was a simple as possible. I think there are two solutions to the problem you're mentioning with providing info about the scheduler to the user:
It is actually more than that. If the scheduler does not provide queues, it would be wrong to display the jobs as if they had queues. Note that any scheduler that does not want to deal with queues, only has to return {"default"} for the queue. And return all of the jobs for getJobs. The interface would be much more brittle, if you had custom servlets for each scheduler, because the interface there would be very broad. Note that you would also need a text version for the cli. I think the two query methods would be much less brittle. For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in
The implication of this is that the Scheduler takes over the responsibility of managing the jobInitQueue. I've created a patch which does this (v11) by inserting a EagerTaskInitializationTaskScheduler into the TaskScheduler hierarchy. In doing so I needed a couple of lifecycle methods, which I've named following HADOOP-3628, so TaskScheduler can be retrofitted to extend Service after HADOOP-3628 is committed. Does this look OK?
Would the taskUpdated method be called by JobTracker#updateTaskStatuses? I can see that it might be useful for schedulers to have this information, but perhaps this is something to add to the interface when a use case comes up? (TaskScheduler is an abstract class, so it's easy to add new methods to it.)
I'm ok with committing this as-is; as long as Matei is fine with changes to TaskScheduler which might arise due to discussions in other jiras... Matei?
It's fine to move this discussion elsewhere, but I think it should be in a new Jira.
Just like a scheduler is a listener to jobs being added/deleted, and their states being modified, it should also be a listener to task states being modified. The 3445 scheduler needs this. For example, it keeps track of how many tasks of a user are running (to handle user limits). So it needs to know when a task starts running or when it completes. It can compute this by iterating through all jobs, but being a listener to a task status change is convenient. I'm not sure where exactly TaskScheduler.updateTask() will be called, but task status changes and job status changes both seem to be needed. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386181/JobScheduler-v11.patch against trunk revision 677470. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. -1 release audit. The applied patch generated 210 release audit warnings (more than the trunk's current 209 warnings). +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/testReport/ This message is automatically generated. I realize we're trying to get this patch committed quickly, but having looked at the latest patch, I do have a concern. As per patch v11, we have the following class hierarchy: TaskScheduler --> EagerTaskInitializationTaskScheduler --> JobQueueTaskScheduler --> LimitTasksPerJobTaskScheduler. EagerTaskInitializationTaskScheduler provides a mechanism for initializing multiple JobInProgress objects asynchronously (through a separate thread). JobQueueTaskScheduler is your basic scheduler, and LimitTasksPerJobTaskScheduler is a scheduler that limits concurrent tasks per job. My concern is that this kind of class hierarchy is the wrong way to allow people to build their own schedulers. What you really want is a library, or a set of separate classes, that provide individual functionality: one for limiting concurrent tasks per job, one for initializing jobs in a separate thread, and so on. Then, somebody can build a scheduler by picking the various functionality they want and composing the classes that provide this functionality. Inheritance does not let you do that.
Here's an example (based on 3445). I want to build a scheduler that limits concurrent tasks per job, but does not want to initialize jobs in a separate thread (it wants to initialize individual jobs directly, only when required, in order to scale). What do I do? I don;t want to extend LimitTasksPerJobTaskScheduler because then I get the functionality of EagerTaskInitializationTaskScheduler, which I don't want. What if my scheduler also supports some sort of fair share (give equal time slots to each user's jobs), and it also supports user limits (limit the number of total tasks associated by a user). Do I still extend LimitTasksPerJobTaskScheduler? In one class? Or do I define a further hierarchy: LimitTasksPerJobTaskScheduler --> FairShareScheduler --> LimitTasksPerUserScheduler ? Which class extends which other class? You really want people to build schedulers by composing lots of individual functionality because scheduling incorporates lots of individual algorithms, each possibly very different from another. I think you really want any class that extends TaskScheduler to be a complete scheduler in itself, made up of lots of different functionalities (the 3445 scheduler, for example, provides task limits per user AND capacities AND priorities AND some preemption, each of which may be reused by some scheduler). Because you want to allow different schedulers to share functionality (for example, two different schedulers may want to limit tasks per jobs, but may also support differing features, so they should ideally share code that limits tasks per job), you want this functionality to be available as composable objects, or in a separate library. You don't want hierarchies based on inheritance. I don't quite know what these composable objects looks like. Perhaps you define an interface which takes in tasks and decides if those tasks pass or fail the policy that the object is implementing. That should be our discussion. Having class hierarchies, as we do today, will severely limit extensibility, IMO. Vivek, You're quite right. I think the hierarchy is a symptom of the abstract base class, TaskScheduler. It's mixing two concerns: listening to job state changes, and scheduling.
To fix this I propose that we break out the listener methods from the TaskScheduler into a JobInProgressListener interface: interface JobInProgressListener { void jobAdded(JobInProgress job); void jobRemoved(JobInProgress job); void jobUpdated(JobInProgress job); } abstract class TaskScheduler { public void start() throws IOException {} public void terminate() throws IOException {} public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException; } TaskSchedulers can then use a choice of JobInProgressListener implementations. For example, JobQueueTaskScheduler has a JobQueueJobInProgressListener to maintain its job queue and a EagerTaskInitializationListener to do task initialization. TaskSchedulers register their listeners with the JobTracker so we add the following two methods to JobTracker (and the TaskTrackerManager interface): public void addJobInProgressListener(JobInProgressListener listener); public void removeJobInProgressListener(JobInProgressListener listener); In the future we might add a TaskInProgressListener interface that allowed TaskSchedulers to listen for changes to tasks.
To do this with the proposed changes you would override LimitTasksPerJobTaskScheduler#start so it doesn't start the EagerTaskInitializationListener, but instead creates a listener to initialize tasks according to its own policy. I've attached a new patch which implements the above (v12). Tom, nice work. It certainly makes the hierarchy cleaner. It also cleanly separates out the differences between listening for changes and supporting a scheduler API.
Another of my concerns (being able to share code/functionality among schedulers) is still not resolved, but it may be a nice-to-have feature, and will probably have a solution different from what we're discussing here, so perhaps that discussion can be on a separate Jira. To reiterate that concern: suppose I want a scheduler that limits tasks per job (so I'd like to reuse code from LimitTasksPerJobTaskScheduler, including the code that deals with configuration). Suppose I also want my scheduler to implement some fair-share functionality that some class, FairShareScheduler has defined (this could be a class similar to that proposed in My predictable comment: JobInProgressListener might better be an abstract class, so that its interface can evolve without breaking existing implementations.
+1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386304/JobScheduler-v12.patch against trunk revision 677712. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/testReport/ This message is automatically generated.
Yes, don't know why I didn't do that - perhaps I'm used to listeners being interfaces. Here's a patch with the change. +1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386376/JobScheduler-v12.1.patch against trunk revision 677872. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/testReport/ This message is automatically generated. I just committed this. Thanks, Tom and Brice!
I don't know what the protocol is for commenting on an issue that has been committed, and I don't mean to suggest un-committing the patch, but I just wanted to bring up a potential problem with making JobInProgressListener an abstract class, rather than an interface. I see Doug's point about it being perhaps easier to modify an abstract base rather than an interface. But, a scheduler, which extends TaskScheduler, may very easily want to also implement the JobInProgressListener methods. It may want to be in complete charge of the data structures to store jobs. In other words, a Scheduler may also be a listener to changes to JobInProgress objects. Is there an easy way to do that if JobInProgressListener is an abstract class?
I just created HADOOP-3801 to address this.
Integrated in Hadoop-trunk #581 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/581/
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||