> I suppose once a value's been consumed from the queue it could be returned to a pool used by the deserializer. We could limit the size of the pool to be the same size as the queue. Is that what you had in mind?
I was thinking that you only need to copy at the beginning of the group, since you can compare subsequent values to the copy, until they differ, at which point you make a new copy.
> I worry a bit that something else could interrupt the thread or intercept the InterruptedException, e.g., in the user's reducer. Is that a well-founded worry?
The interrupt could be used to signal to check a shared variable indicating that the reduce is done, rather than the runner interpreting an interrupt as a done signal. This would take care of the problem of something accidentally interrupting.
Not sure about an interrupt signal being intercepted, or not being delivered. But I think it's possible that the interrupt occurs between the check on "done" and the call to take(), so the call to take() would go ahead and cause a deadlock.
> A better approach might be to put in a sentinel value. Unfortunately this has to be of type T, and we don't know how to construct a T.
Could we use AvroWrapper here? Perhaps a null wrapper means end of reduce.