|
A) this is important because it could lead to big throughput gains and seek reductions.
B) It is not going to work to combine splits statically because block replicas are not co-resident. This would lead to a huge performance hit due to loss of locality. I think we will need to invest in more complexity to get the desired performance improvement here. My gut is that we should do this dynamically in the task trackers. This would let us do it when we are seeing good io throughput. The map driver could always just request a new split after each input finishes. The TT would keep a small number of candidate splits locally and decide after each map completes a split if it is going to hand it another one. None of the public interfaces would need to change. We would need to change the JT quite a bit to manage maps publishing split collections, but it seems fairly straightforward. We could realize a huge performance gain on simple scanning jobs that process input quickly. We could also see good shuffle improvements. This would interact with speculative execution in undesirable ways... Something to watch-out for. There are a whole class of collation optimizations here. The fact that we are sorting early may make a lot of them harder... ugh. A related idea runping and I discussed is that if you have multiple spills in a map (combined or unchanged map), there is no point collating the spills if the reduce partitions are relatively large (say 1MB). We could just make each spill an output to the reduces. Even if they are small it would be more efficient to collate in larger units than within a single map, but that starts really broadening the design space... Could static splits combinations work at all? Yes I think they might if we produced only a small number and executed them early, but this would reduce the possible gain we could get. > It is not going to work to combine splits statically because block replicas are not co-resident.
If the number of blocks in the job input is hugely greater than the number of nodes, then it should be easy to find nodes that have a large number of blocks locally, and group the blocks thusly into tasks. If a task fails, then the re-execution might not be local, but most tasks don't fail, and we can arrange things so that the first node a task is assigned to has all its blocks. Or am i missing something? Consider the following algorithm:
Wouldn't that work? Good one!
We avoid a lot of diskIO/seeks in the final merge of spills on the map side. This will be benefecial for cases where the partitions can fit in the ramfs on the reduces. We get merge for almost free then. Nodes operate at different rates. Failures happen. In the face of several jobs running, some nodes may not even become available in a timely manner. I think a static approach will not allow both the performance gains desired and preservation of reasonable throughput.
The current system takes full advantage of mapping jobs to nodes dynamically. A static combination of splits will break all of this. One could perhaps do something like what you suggest dynamically in the JT when a TT requests a new job. This might be a good compromise implementation. This would also let you observe some global statistics on speed of maps & size of outputs which would let you optimize cluster sizes. Of course doing this all dynamically on the TTs might use fewer JT resources. > The current system takes full advantage of mapping jobs to nodes dynamically.
Currently we compute and cache the mapping once per job, and then base all subsequent decisions on that cache. We get ~99% job locality with that 'static' information. Things should be about about the same if we group things, unless I'm missing something. > One could perhaps do something like what you suggest dynamically in the JT when a TT requests a new job. That's a possible enhancement. I'm not sure it's required for good localization, and it would add significant load to the namenode. We queue each map for each candidate node (and now presumably rack) and pull them from consideration once they are scheduled on any node.
This gets much more complicated with map sets, since you will need to tag which maps in one set have been executed somewhere else and then replace them... Much simpler to make the late binding decision to bundle them. I get a feeling this issue will be revisit more than once... > Much simpler to make the late binding decision to bundle them.
The algorithm I outlined above could be done incrementally, rather than all up-front:
This is essentially the existing algorithm, except that we allocate more than one split per task. In fact, the existing algorithm handles lots of other subtle cases like speculative execution, task failure, etc. So the best way to implement this is probably to use the existing algorithm multiple times per task, etc. Earlier I'd spoke of implementing this up front, when constructing splits. But if it's done this way, then we needn't actually change public APIs or InputFormats. Tasks could simply internally be changed to execute a list of splits rather than a single split. I like that approach, Doug. We should also have a entry for splits that are local to no nodes in the map/reduce cluster and prefer to steal from them rather than other nodes. This would solve HADOOP-2014...
I like the algorithm Doug outlined above. Processing N input splits per mapper task may result in map output spills, depending on the value of N, the split sizes, the value of io.sort.mb, and the nature of the map function. The actual number of splits a particular mapper task will take should vary, depending on the number of splits to be processed. All the splits processed by the same mapper task should share the same rackid so that rack locality should be maintained. If we implement this Jira, I think the issue of reusing JVM outlined in I'd propose:
A few observations:
1) You really, really don't want the user configuring the number of tasks to combine. The system is hard enough for users to understand without asking them to do this! Also, if the default is 1, this will effectively not be used. 2) If you don't allow speculation on combined tasks, then you can really wedge the job, I don't think this restriction is a good idea. 3) You don't want the node to stall going to the JT for a new task when a task is complete. This will defeat a lot of the value of this optimization. Perhaps the TT can request one more task than it has slots, in the hope that it can be assigned when the next task finishes? 4) This is all about collation after map tasks. I think we need to start thinking of that distinctly from maps themselves. How can we make this work if we spill on average once per 2.5 maps? Or 4 times for every 3 maps? Right now the 4:3 case requires 8 spills, 4 collations and 4 fetches per reducer. What if we could turn it into 3 spills, one collation and 1 fetch per reducer? That would be a big saving. The resources needed in the Map phase typically depends on the amount of data it processes. Instead of specifying a grouping value of N, can we say that grouping will occur based on the size of the input data? The JT can then group as many mappers as needed to make the total input data be close to the specified value.
It would be great if we could just do that lazily...
Some more thought on this issue. Here is a sample patch that combines N splits to be executed by the same mapper instance. The logic is like this: The Normal logic is used to find the first split to be assigned to a mapper. Then if there exists other splits that are rack-local to the TT, then N-1 of those are selected and assigned to the same mapper. N is configurable per job.
This code still needs more testing, I am posting it early just as a reference. The problem that I am trying to solve is a job that processes a large number of input files. Each of these files are small. In the existing implementation, each mapper gets to process one split and each split is very small in size (limited by the size of the input file). I will investigate the implementation of MultipleInput.java to see if I can achieve mappers to processes larger splits.
Instead of using a limit N on the number of splits, I'd have a limit on the number of MB of input data, call it L. Then a default value can be set for the cluster as a whole and it will do something reasonable in each job. If some jobs have really lightweight mappers they can also choose to increase it.
I would first look to see if there are enough data-local input chunks to make L MB of total data. If not, put all of the data-local ones go on to rack-local ones. This is strictly better than picking just random rack-local splits. be careful of the starvation issues we've seen before! You may dish out too much data to your first N nodes and distribute your input asymmetrically. What about just keeping the JVM up and passing in more data as each "task" completes?
Hi Eric, You are right, this could have unwelcome performance characteristics in certain cases. We will have to measure this first.
I am actually more inclined to solve my problem using |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
That makes good sense to me. The new Split class could look a lot like MultiFileSplit, but would additionally support a 'getStart(int)' method. So perhaps MultiFileSplit could be extended for this purpose. FileInputFormat could be modified to emit these when the number of splits would otherwise exceed some threshold. But then all subclasses of FileInputFormat would need to be modified to be able to accept these. That wouldn't be too hard. FileInputFormat could implement getRecordReader(InputSplit) to break out the sub-splits, then call a new method, getRecordReader(FileSplit). All existing subclasses could then just change the signature of their getRecordReader implementations in order to support the new feature.