Jun/Jay, thanks a lot for the very helpful reviews. Here are follow-ups to
some of your comments - just wanted to make sure I fully address them in v2,
so let me know if I missed anything. One major thing to decide is what to do
on a leader change during a delayedProduce - should it be handled
immediately or let the full request timeout?
> 1. TopicData.partitionDatas: data is the plural form of datum. So datas
> feels weird. How about partitionDataArray?
That works - no wonder 'datas' sounded odd.
> 3. DelayedProduce:
> 3.1 isSatisfied(): need to handle the case when requiredAcks is default
> (probably any value <0). This means whenever we get the ack from every
> replica in the current ISR, the request is satisfied. This can be done by
> simply making sure leader.HW >=
Thanks for pointing that out - forgot to handle < 0.
> 3.4 We need to be aware that isSatisfied can be called concurrently on the
> same DelayedProduce object. I am not sure if it's really necessary, but
> maybe we should consider using AtomicBoolean for
I don't think isSatisfied can be called concurrently on the same
DelayedProduce - the ProduceRequestPurgatory's watcher map is from key ->
Watchers. The isSatisfied method is synchronized (through
collectSatisfiedRequests) on Watchers. I do access partitionStatus for
respond(), but that's after the delayedProduce has been kicked out of the
purgatory. Or am I missing some other scenario? If so, can you elaborate?
> 4. ProducerRequest: actTimeoutSecs should be actTimeoutMs.
Forgot to bring this up in the v1 patch - it was simply named ackTimeout and
treated as seconds (i.e., the producer request object uses an Int instead of
Long) - that's why I added the secs suffix and left it as Int. Will switch
it to use long and ms instead.
> 5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory
> when there is a leadership change, since the logic is simpler. We just let
> the producer requests timeout. If we do want to do the check here, we
> should share the ProducerRequestPurgatory object used in KafkaApis.
Ok - that's also fine, and the code comment indicated it was optional. Not
doing the check should help significantly with the code structure. Right now
there is a circular dependency which forced me to put DelayedProduce and
ProduceRequestPurgatory outside KafkaApis and pass in KafkaApis to
DelayedProduce - can probably lay it out differently if we want to do this
check later, but for now this makes things easier.
> 6. SyncProducerConfigShared: We should probably make the default
> requiredAcks to be -1 (i.e., wait for ack from each replica in ISR).
Ok - either way sounds good. The existing default is (almost) the same
behavior as 0.7 except that the producer response is sent after the message
is persisted at the leader.
> 1. There is a race condition between adding to the purgatory and
> acknowledgement, I think. We first produce locally and then add the
> watcher, which allows a window where the message is available for
> replication with no watcher on ack. If a replica acknowledges a given
> produce before the DelayedProduce is added as a watcher that will not be
> recorded, I think, and the request will potentially timeout waiting for
> ISR acks. This would certainly happen and lead to sporadic timeouts. This
> is tricky, let's discuss. One solution would be to reverse the logic--add
> the watcher first then produce it locally, but this means having to handle
> the case where the local produce fails (we don't want to leak watchers).
> The race condition that Jay raised seems like a potential issue. However,
> it may be tricky to add the watcher before writing to the local log in the
> leader. This is because the watcher needs to know the end offset of
> messages in the leader's log. Another way to address the race condition is
> to still add messages to leader's log first, followed by adding the
> watcher. However, after the watcher is added, we can explicitly trigger a
> watcher check by calling ProducerRequestPurgatory.update.
Excellent points - will think about how best to handle this.
> 2. As you say, you definitely can't pass KafkaApis to anything, making
> DelayedProduceReuqest an inner class of KafkaApis will fix this, I think.
Right - that's actually what I started out with (i.e., the purgatory and
delayed request inside kafkaApis), but as I described above, there is a
circular dependency and I was forced to move it out. However, the conclusion
seems to be that we can do away with purgatory update on becoming follower
so we can break this dependency loop.
> 3. I think the way we are tying the api handling and replica manager is
> not good. replica manager shouldn't know about DelayedProduce requests if
> possible, it should only know about replica management and be oblivous to
> the api level.
Covered above (2).
> 4. Why is the ack timeout in seconds? It should be milliseconds like
> everything else, right? One would commonly want a sub-second timeout.
Yes - it should be ms. (described above)
> 5. As a stylistic thing it would be good to avoid direct use of
> Tuple2[A,B] and instead use (A,B) (e.g. myMethod: List[(A,B)] instead of
> myMethod: List[Tuple2[A,B]]
> 6. I don't understand why we are passing (topic, partition) to the
> purgatory as BOTH the key and request. This is a bit hacky since this is
> not the request. Let's discuss, we may need to clean up the purgatory api
> to make it gracefully handle this case.
It does seem a bit unintuitive on the surface, but I think it is correct:
i.e., the produce request is to a set of (topic, partition) and potentially
unblocks DelayedFetch requests to those (topic, partition).
> 7. I think the ProducerRequestPurgatory should be instantiated in
> KafkaApis not KafkaServer--it is an implementation detail of the api layer
> not a major subsystem.
Covered above (2).
> 8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a
> little bit tricky could you clean that up or if that is not possible
> comment what it does (I think it just makes a list of (topic, partition)
> pairs, but the variable name doesn't explain it). Same with
That is correct - will rename/comment.
- 9. DelayedProduce should definitely not be in the kafka.api package, that
- package is for request/response "structs". DelayedProduce is basically a
- bunch of server internals and not something the producer or consumer
- should be aware of. Likewise, the code for
- ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but
- these things are mirror images of one another. FetchRequestPurgatory is in
- KafkaApis, ProduceRequestPurgatory in its own file. These should match
- each other. The reasoning behind keeping things in KafkaApis was that
- those classes contained part of the logic for processing a request, so
- splitting it into two files makes it harder to read (the counter argument
- is that KafkaApis is getting pretty big). The other reason was so it could
- access the variables in KafkaApis as a nested class. Either way is
- probably fine, but they should be symetric between the two.
Covered above (2).
> 11. I am not so sure about the logic of proactively responding to all
> outstanding requests with failure during a leadership change. Is this the
> right thing to do?
Related to (2) above - we can discuss this more. The benefit of doing this
is that a DelayedProduce can be unblocked sooner (if the timeout is high)
and has a more accurate error code - i.e., the produce request failed for
this partition because its leader changed. However, not doing anything and
throwing a timeout is also fine - we just need to decide on one or the other
and document it for the producer API.
> 12. The logic in DelayedProduce.isSatisfied is very odd. I think what we
> want to do is respond to all outstanding requests in the event of a
> leadership change. But we do this by passing in the ReplicaManager and
> having logic that keys off whether or not the local broker is the leader.
> This seems quite convoluted. Wouldn't it make more sense to do this: (1)
> Have the replica manager allow "subscribers" to its state changes. (2) Add
> a subscriber that loops over all in-flight requests and expires them with
> an error when the local broker ceases to be the leader. There is a
> description of this pattern here http://codingjunkie.net/guava-eventbus/.
I think there are two concerns you raise here:
Responding to requests in the event of a leadership change: This was
intended as a safety net - the existing patch deals with the leadership
change in replicaManager (which is why replicaManager needs access to the
produceRequestPurgatory). However, if we choose not to handle the leader
change and just timeout, then we don't need that code in replicaManager and
we can remove this logic as well.
(ii) Passing in replicaManager - this is required because I need to get the
inSyncReplicas for a partition, and the replicaManager maintains that.
However, as described in (2) I think this can be directly accessed when I
move this back to KafkaApis.
> 13. Nice catch with the request size. I think it is okay that we are using
> sizeInBytes, since it is not expected that the producer send fragments,
> though a fully correct implementation would check that.
That's a good point - using sizeInBytes is (more) correct then.
> Also, another thought. This implementation is keeping a Map[(topic,
> partition)] => # acks. I wonder if it would be better to do this in terms
> of the acknowledged hw. Basically keep a global structure as part of
> replica manager that tracks the position of all replicas for all
> partitions. Then the isSatisfied topic would just check this structure to
> see if the replicas had advanced far enough. This seems like it would not
> require a per-request map of state and would be less fragile. Somehow this
> structure seems generally useful for monitoring and other purposes as
This is exactly how it is implemented now - let me know if I misunderstood
your comment. The acksPending is a boolean. The actual ack count is
determined from the acknowledged HW that the replicaManager keeps track of.
The map is still required to maintain other information such as the error
code (e.g., if there was an error in writing to the log), the offset for
each partition that followers need to fetch beyond for this request to be
unblocked, and if there are any acks pending (so we don't bother re-counting