One solution is to specify maximum number of queued requests for LinkedBlockingQueue.
That could be it, but this solution needs more changes. When the queue is full and the max number of threads are running, new task will be rejected. We could apply CallerRunsPolicy, but the whole point of having ThreadPoolExecutor is to avoid blocking of JobTracker for doing job completion.
I think the main requirements here are:
- Absorb bursty job completions - queueing with sufficient capacity or fast dispatching with a large thread pool.
- Avoid limiting job throughput - enough number of worker threads
- Minimize consumption of extra resource - limit the number of worker threads
- Don't drop anything.
To satisfy the first and second requirements, one can think of the following two approaches.
- Have a bounded queue and a sufficiently large thread pool. Since we cannot drop any job completion, we want CallerRunsPolicy for rejected ones.
- Alternatively, use an unbounded queue and a reasonable number of core threads. No work will be rejected in this case.
Between the two, the second one has an advantage, considering the third requirement and its simplicity. The question is, what is the reasonable number of core threads to avoid lagging behind forever? Base on our experience, 3 to 5 seems reasonable. The moveToDone() throughput varies a lot, but it topped at around 0.8/second in one of busiest clusters I've seen. If the job completion rate goes over this rate for a long time, the queue will grow and history won't show up for most of newer jobs.
Here are the two approaches in code:
- The queue is bounded but will absorb bursts of about 100. If the core thread cannot keep up, up to 10 more threads will be created to help the core thread drain the queue. If the queue cannot be drained fast enough, the caller will directly execute the work. This will block the job tracker, since JobTracker#finalizeJob() is a synchronized method. So the thread pool size and the queue size must be sufficiently large.
executor = new ThreadPoolExecutor(1, 10, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(100), ThreadPoolExecutor.CallerRunsPolicy);
- The following will eventually start up 5 threads and keep them running. Non-blocking and least amount of changes.
executor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
What do you think is better? Or can you think of any better approaches?