Summary of changes and notes:
1 - Fixed the synchronization issue (raised in
checkSatisfied and expire by synchronizing on the DelayedItem.
2 - Added request purgatory metrics using the metrics-core library. Also
added support for csv/ganglia/graphite reporters which I think is useful -
e.g., I attached a graphite dashboard that was pretty easy to whip up. It
should be a breeze to use metrics-core for other stats in Kafka.
3 - This brings in dependencies on metrics and slf4j, both with Apache
compatible licenses. I don't know of any specific best-practices in using
metrics-core as I have not used it before, so it would be great if people
with experience using it glance over this patch.
4 - It's a bit hard to tell right now which metrics are useful and which are
pointless/redundant. We can iron that out over time.
5 - Some metrics are only global and both global and per-key (which I think
is useful to have, e.g., to get a quick view of which partitions are
slower). E.g., it helped to see (in the attached screen shots) that fetch
requests were all expiring - and it turned out to be a bug in how
DelayedFetch requests from followers are checked for satisfaction. The
issue is that maybeUnblockDelayedFetch is only called if required acks is
0/1. We need to call it always - in the FetchRequestPurgatory
checkSatisfied method, if it is a follower request then we need to use
logendoffset to determine the available bytes to the fetch request, and HW
if it is a non-follower request. I fixed it to always check
availableFetchBytes, but it can be made a little more efficient by having
the DelayedFetch request keep track of currently available bytes in each
6 - I realized that both the watchersForKey and per-key metrics pools keep
growing. It may be useful to have a simple garbage collector in the Pool
class that garbage collects entries that are stale (e.g., due to a
leader-change), but this is non-critical.
7 - I needed to maintain DelayedRequest metrics outside the purgatory:
because the purgatory itself is abstract and does not have internal
knowledge of delayed requests and their keys. Note that these metrics are
for delayed requests - i.e., these metrics are not updated for those
requests that are satisfied immediately without going through the
8 - There is one subtlety with producer throughput: I wanted to keep per-key
throughput, so the metric is updated on individual key satisfaction. This
does not mean that the DelayedProduce itself will be satisfied - i.e,.
what the metric reports is an upper-bound since some DelayedProduce
requests may have expired.
9 - I think it is better to wait for Kafka-376 to go in first. In this
patch, I hacked a simpler version of that patch - i.e., in
availableFetchBytes, I check the logEndOffset instead of the
high-watermark. Otherwise, follower fetch requests would see zero
available bytes. Of course, this hack now breaks non-follower fetch
10 - KafkaApis is getting pretty big - I can try and move DelayedMetrics out
if that helps although I prefer having it inside since all the
DelayedRequests and purgatories are in there.
11 - There may be some temporary edits to start scripts/log4j that I will
revert in the final patch.
What's left to do:
a - This was a rather painful rebase, so I need to review in case I missed
b - Optimization described above: DelayedFetch should keep track of
bytesAvailable for each key and FetchRequestPurgatory's checkSatisfied
should take a topic, partition and compute availableBytes for just that
c - The JMX operations to start and stop the reporters are not working
properly. I think I understand the issue, but will fix later.