Kafka
  1. Kafka
  2. KAFKA-50 kafka intra-cluster replication support
  3. KAFKA-353

tie producer-side ack with high watermark and progress of replicas

    Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None
    1. kafka-353_v3.patch
      56 kB
      Joel Koshy
    2. kafka-353_v2.patch
      54 kB
      Joel Koshy
    3. kafka-353_v2_to_v3.patch
      3 kB
      Joel Koshy
    4. kafka-353_v1.patch
      39 kB
      Joel Koshy

      Activity

      Hide
      Joel Koshy added a comment -

      Thanks for the review - committed to 0.8 and opened jiras for the other issues.

      Show
      Joel Koshy added a comment - Thanks for the review - committed to 0.8 and opened jiras for the other issues.
      Hide
      Jun Rao added a comment -

      It's ok to handle the remaining issues in separate jiras. +1 from me on committing patch v3.

      Show
      Jun Rao added a comment - It's ok to handle the remaining issues in separate jiras. +1 from me on committing patch v3.
      Hide
      Joel Koshy added a comment -

      Added that comment on requiredAcks < 0.

      That's a good point about inSyncReplicas. Looks like this is used incorrectly from other places, so I would prefer doing that (and the additional synchronization for expire/checkSatisfied) in a separate jira if that's okay with others.

      Show
      Joel Koshy added a comment - Added that comment on requiredAcks < 0. That's a good point about inSyncReplicas. Looks like this is used incorrectly from other places, so I would prefer doing that (and the additional synchronization for expire/checkSatisfied) in a separate jira if that's okay with others.
      Hide
      Jun Rao added a comment -

      A few more comments on patch v3:

      31. DelayedProduce.isSatisfied():
      31.1 We iterate leader.partition.inSyncReplicas here. However, ISR can be updated concurrently. We probably need to make leader.partition.inSyncReplicas an AtomicReference.
      31.2 Could you add a comment that explains what produce.requiredAcks < 0 means?

      For the race condition between expire() and checkSatisfied, I agree that it's probably easier if RequestPurgatory does the synchronization on DelayedRequest.

      Show
      Jun Rao added a comment - A few more comments on patch v3: 31. DelayedProduce.isSatisfied(): 31.1 We iterate leader.partition.inSyncReplicas here. However, ISR can be updated concurrently. We probably need to make leader.partition.inSyncReplicas an AtomicReference. 31.2 Could you add a comment that explains what produce.requiredAcks < 0 means? For the race condition between expire() and checkSatisfied, I agree that it's probably easier if RequestPurgatory does the synchronization on DelayedRequest.
      Hide
      Joel Koshy added a comment -

      Here is an incremental patch over v2 that makes the diff clearer.

      Show
      Joel Koshy added a comment - Here is an incremental patch over v2 that makes the diff clearer.
      Hide
      Joel Koshy added a comment -

      Thanks for catch-21 Fixed that - (note that requiredAcks cannot be 0 in isSatisfied.)

      For 22, turned out to be due to inconsistent default values for the producer config. Test passes now.

      Re: Jay's comment. It is true that the requests that collectSatisfied returns will not go through expiration (by virtue of the satisfied guard). The issue is that the client needs to be aware that the call to expire and checkSatisfied can be simultaneous and so may need to synchronize on data structures that their implementations may access. So we can either synchronize on the purgatory or clearly document this. Personally, I'm leaning toward fixing it in purgatory because:

      • if checkSatisfied is called (and it will return true), it seems we should consider the deadline to have been met (although this is not clear cut) so expiration should not proceed.
      • it makes the purgatory a little harder to use as the client needs to be aware of the race condition.
      Show
      Joel Koshy added a comment - Thanks for catch-21 Fixed that - (note that requiredAcks cannot be 0 in isSatisfied.) For 22, turned out to be due to inconsistent default values for the producer config. Test passes now. Re: Jay's comment. It is true that the requests that collectSatisfied returns will not go through expiration (by virtue of the satisfied guard). The issue is that the client needs to be aware that the call to expire and checkSatisfied can be simultaneous and so may need to synchronize on data structures that their implementations may access. So we can either synchronize on the purgatory or clearly document this. Personally, I'm leaning toward fixing it in purgatory because: if checkSatisfied is called (and it will return true), it seems we should consider the deadline to have been met (although this is not clear cut) so expiration should not proceed. it makes the purgatory a little harder to use as the client needs to be aware of the race condition.
      Hide
      Jun Rao added a comment -

      Thanks for patch v2. A couple of more comments:

      21. DelayedProduce.isSatisfied(): The following check seems to be wrong. If produce.requiredAcks < 0, numAcks is always going to be >= produce.requiredAcks.
      if (numAcks >= produce.requiredAcks ||
      (numAcks >= isr.size && produce.requiredAcks < 0)) {

      22. AsyncProducerTest seems to keep failing for me.

      Show
      Jun Rao added a comment - Thanks for patch v2. A couple of more comments: 21. DelayedProduce.isSatisfied(): The following check seems to be wrong. If produce.requiredAcks < 0, numAcks is always going to be >= produce.requiredAcks. if (numAcks >= produce.requiredAcks || (numAcks >= isr.size && produce.requiredAcks < 0)) { 22. AsyncProducerTest seems to keep failing for me.
      Hide
      Jay Kreps added a comment -

      The guard is checked before the call to expire. The invariant here is everyone must do a test-and-set on the satisfied variable and only proceed with handling the request if it is satisfied. I don't think there is any problem with checkSatisfied being called on an item that is going to be expired it is only a problem if we try to process the request twice (the call to check satisfied or expire will end up being redundant work but that is not a major problem).

      Show
      Jay Kreps added a comment - The guard is checked before the call to expire. The invariant here is everyone must do a test-and-set on the satisfied variable and only proceed with handling the request if it is satisfied. I don't think there is any problem with checkSatisfied being called on an item that is going to be expired it is only a problem if we try to process the request twice (the call to check satisfied or expire will end up being redundant work but that is not a major problem).
      Hide
      Joel Koshy added a comment -

      Here is v2 - changes from v1:

      • Rebased.
      • Addressed all the "minor" comments.
      • Using NonFollowerId (alias of DefaultReplicaId) instead of -1
      • For request timing: added the duration computation to the Response
        class and removed all the default -1.
      • Added handling for requiredAcks < 0
      • To deal with the race condition that Jay raised, did an explicit update
        after adding to the produceRequestPurgatory.
      • Removed the proactive update to produceRequestPurgatory on a leader
        change. This facilitated moving the ProduceRequestPurgatory and
        DelayedProduce back to KafkaApis where they belong.
      • I'm still doing the isLocal check in the isSatisfied method because we get
        it for free and NotLeaderForPartitionCode is more accurate than giving a
        timeout error code. We can discuss whether to remove this altogether or
        not.
      • One thing v1 did not handle was negative timeouts which should be
        interpreted as forever, so I converted -ve values to Long.MaxValue. In
        doing this I hit what I thought was a bug in DelayQueue but turned out to
        be an overflow issue with DelayedItem - added checks for this.

      Also, Jun asked about a race condition on partitionStatus - although
      isSatisfied cannot be called simultaneously on the same DelayedProduce, I
      think there is a race condition between expire/checkSatisfied in the
      requestPurgatory. I can switch to a ConcurrentHashMap for the
      partitionStatus map, but I think this is an issue in the RequestPurgatory.
      I think it should be easy to fix, but wanted to call it out first to see if
      I'm incorrect:

      • An incoming request triggers checkSatisfied on a DelayedItem
      • During checkSatisfied the DelayedItem expires and the expiration loop
        calls expire().
      • There is a satisfied guard on the DelayedItem but that is checked only
        after expire() and after checkSatisfied() so it is possible for both
        checkSatisfied and expire to be called and it doesn't seem to make sense
        to allow both - it's one or the other.
      Show
      Joel Koshy added a comment - Here is v2 - changes from v1: Rebased. Addressed all the "minor" comments. Using NonFollowerId (alias of DefaultReplicaId) instead of -1 For request timing: added the duration computation to the Response class and removed all the default -1. Added handling for requiredAcks < 0 To deal with the race condition that Jay raised, did an explicit update after adding to the produceRequestPurgatory. Removed the proactive update to produceRequestPurgatory on a leader change. This facilitated moving the ProduceRequestPurgatory and DelayedProduce back to KafkaApis where they belong. I'm still doing the isLocal check in the isSatisfied method because we get it for free and NotLeaderForPartitionCode is more accurate than giving a timeout error code. We can discuss whether to remove this altogether or not. One thing v1 did not handle was negative timeouts which should be interpreted as forever, so I converted -ve values to Long.MaxValue. In doing this I hit what I thought was a bug in DelayQueue but turned out to be an overflow issue with DelayedItem - added checks for this. Also, Jun asked about a race condition on partitionStatus - although isSatisfied cannot be called simultaneously on the same DelayedProduce, I think there is a race condition between expire/checkSatisfied in the requestPurgatory. I can switch to a ConcurrentHashMap for the partitionStatus map, but I think this is an issue in the RequestPurgatory. I think it should be easy to fix, but wanted to call it out first to see if I'm incorrect: An incoming request triggers checkSatisfied on a DelayedItem During checkSatisfied the DelayedItem expires and the expiration loop calls expire(). There is a satisfied guard on the DelayedItem but that is checked only after expire() and after checkSatisfied() so it is possible for both checkSatisfied and expire to be called and it doesn't seem to make sense to allow both - it's one or the other.
      Hide
      Jun Rao added a comment -

      3.4 I overlooked that watcher.collectSatisfiedRequests is synchronized. So, this is not an issue.

      Show
      Jun Rao added a comment - 3.4 I overlooked that watcher.collectSatisfiedRequests is synchronized. So, this is not an issue.
      Hide
      Jay Kreps added a comment -

      Will respond to other points, but the intention of the timeout was that it was milliseconds. It is only 4 bytes because an infinite timeout can be specified by -1 and Int.MaxValue is already a lot of milliseconds.

      Show
      Jay Kreps added a comment - Will respond to other points, but the intention of the timeout was that it was milliseconds. It is only 4 bytes because an infinite timeout can be specified by -1 and Int.MaxValue is already a lot of milliseconds.
      Hide
      Joel Koshy added a comment -

      Jun/Jay, thanks a lot for the very helpful reviews. Here are follow-ups to
      some of your comments - just wanted to make sure I fully address them in v2,
      so let me know if I missed anything. One major thing to decide is what to do
      on a leader change during a delayedProduce - should it be handled
      immediately or let the full request timeout?

      > 1. TopicData.partitionDatas: data is the plural form of datum. So datas
      > feels weird. How about partitionDataArray?

      That works - no wonder 'datas' sounded odd.

      > 3. DelayedProduce:
      > 3.1 isSatisfied(): need to handle the case when requiredAcks is default
      > (probably any value <0). This means whenever we get the ack from every
      > replica in the current ISR, the request is satisfied. This can be done by
      > simply making sure leader.HW >=
      > delayedProduce.partitionStatus(followerFetchPartition).localOffset.

      Thanks for pointing that out - forgot to handle < 0.

      > 3.4 We need to be aware that isSatisfied can be called concurrently on the
      > same DelayedProduce object. I am not sure if it's really necessary, but
      > maybe we should consider using AtomicBoolean for
      > PartitionStatus.acksPending?

      I don't think isSatisfied can be called concurrently on the same
      DelayedProduce - the ProduceRequestPurgatory's watcher map is from key ->
      Watchers. The isSatisfied method is synchronized (through
      collectSatisfiedRequests) on Watchers. I do access partitionStatus for
      respond(), but that's after the delayedProduce has been kicked out of the
      purgatory. Or am I missing some other scenario? If so, can you elaborate?

      > 4. ProducerRequest: actTimeoutSecs should be actTimeoutMs.

      Forgot to bring this up in the v1 patch - it was simply named ackTimeout and
      treated as seconds (i.e., the producer request object uses an Int instead of
      Long) - that's why I added the secs suffix and left it as Int. Will switch
      it to use long and ms instead.

      > 5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory
      > when there is a leadership change, since the logic is simpler. We just let
      > the producer requests timeout. If we do want to do the check here, we
      > should share the ProducerRequestPurgatory object used in KafkaApis.

      Ok - that's also fine, and the code comment indicated it was optional. Not
      doing the check should help significantly with the code structure. Right now
      there is a circular dependency which forced me to put DelayedProduce and
      ProduceRequestPurgatory outside KafkaApis and pass in KafkaApis to
      DelayedProduce - can probably lay it out differently if we want to do this
      check later, but for now this makes things easier.

      > 6. SyncProducerConfigShared: We should probably make the default
      > requiredAcks to be -1 (i.e., wait for ack from each replica in ISR).

      Ok - either way sounds good. The existing default is (almost) the same
      behavior as 0.7 except that the producer response is sent after the message
      is persisted at the leader.

      > 1. There is a race condition between adding to the purgatory and
      > acknowledgement, I think. We first produce locally and then add the
      > watcher, which allows a window where the message is available for
      > replication with no watcher on ack. If a replica acknowledges a given
      > produce before the DelayedProduce is added as a watcher that will not be
      > recorded, I think, and the request will potentially timeout waiting for
      > ISR acks. This would certainly happen and lead to sporadic timeouts. This
      > is tricky, let's discuss. One solution would be to reverse the logic--add
      > the watcher first then produce it locally, but this means having to handle
      > the case where the local produce fails (we don't want to leak watchers).

      > The race condition that Jay raised seems like a potential issue. However,
      > it may be tricky to add the watcher before writing to the local log in the
      > leader. This is because the watcher needs to know the end offset of
      > messages in the leader's log. Another way to address the race condition is
      > to still add messages to leader's log first, followed by adding the
      > watcher. However, after the watcher is added, we can explicitly trigger a
      > watcher check by calling ProducerRequestPurgatory.update.

      Excellent points - will think about how best to handle this.

      > 2. As you say, you definitely can't pass KafkaApis to anything, making
      > DelayedProduceReuqest an inner class of KafkaApis will fix this, I think.

      Right - that's actually what I started out with (i.e., the purgatory and
      delayed request inside kafkaApis), but as I described above, there is a
      circular dependency and I was forced to move it out. However, the conclusion
      seems to be that we can do away with purgatory update on becoming follower
      so we can break this dependency loop.

      > 3. I think the way we are tying the api handling and replica manager is
      > not good. replica manager shouldn't know about DelayedProduce requests if
      > possible, it should only know about replica management and be oblivous to
      > the api level.

      Covered above (2).

      > 4. Why is the ack timeout in seconds? It should be milliseconds like
      > everything else, right? One would commonly want a sub-second timeout.

      Yes - it should be ms. (described above)

      > 5. As a stylistic thing it would be good to avoid direct use of
      > Tuple2[A,B] and instead use (A,B) (e.g. myMethod: List[(A,B)] instead of
      > myMethod: List[Tuple2[A,B]]

      Sounds good.

      > 6. I don't understand why we are passing (topic, partition) to the
      > purgatory as BOTH the key and request. This is a bit hacky since this is
      > not the request. Let's discuss, we may need to clean up the purgatory api
      > to make it gracefully handle this case.

      It does seem a bit unintuitive on the surface, but I think it is correct:
      i.e., the produce request is to a set of (topic, partition) and potentially
      unblocks DelayedFetch requests to those (topic, partition).

      > 7. I think the ProducerRequestPurgatory should be instantiated in
      > KafkaApis not KafkaServer--it is an implementation detail of the api layer
      > not a major subsystem.

      Covered above (2).

      > 8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a
      > little bit tricky could you clean that up or if that is not possible
      > comment what it does (I think it just makes a list of (topic, partition)
      > pairs, but the variable name doesn't explain it). Same with
      > KafkaApis.handleFetchRequest.

      That is correct - will rename/comment.

      1. 9. DelayedProduce should definitely not be in the kafka.api package, that
      2. package is for request/response "structs". DelayedProduce is basically a
      3. bunch of server internals and not something the producer or consumer
      4. should be aware of. Likewise, the code for
      5. ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but
      6. these things are mirror images of one another. FetchRequestPurgatory is in
      7. KafkaApis, ProduceRequestPurgatory in its own file. These should match
      8. each other. The reasoning behind keeping things in KafkaApis was that
      9. those classes contained part of the logic for processing a request, so
      10. splitting it into two files makes it harder to read (the counter argument
      11. is that KafkaApis is getting pretty big). The other reason was so it could
      12. access the variables in KafkaApis as a nested class. Either way is
      13. probably fine, but they should be symetric between the two.

      Covered above (2).

      > 11. I am not so sure about the logic of proactively responding to all
      > outstanding requests with failure during a leadership change. Is this the
      > right thing to do?

      Related to (2) above - we can discuss this more. The benefit of doing this
      is that a DelayedProduce can be unblocked sooner (if the timeout is high)
      and has a more accurate error code - i.e., the produce request failed for
      this partition because its leader changed. However, not doing anything and
      throwing a timeout is also fine - we just need to decide on one or the other
      and document it for the producer API.

      > 12. The logic in DelayedProduce.isSatisfied is very odd. I think what we
      > want to do is respond to all outstanding requests in the event of a
      > leadership change. But we do this by passing in the ReplicaManager and
      > having logic that keys off whether or not the local broker is the leader.
      > This seems quite convoluted. Wouldn't it make more sense to do this: (1)
      > Have the replica manager allow "subscribers" to its state changes. (2) Add
      > a subscriber that loops over all in-flight requests and expires them with
      > an error when the local broker ceases to be the leader. There is a
      > description of this pattern here http://codingjunkie.net/guava-eventbus/.

      I think there are two concerns you raise here:

      Responding to requests in the event of a leadership change: This was
      intended as a safety net - the existing patch deals with the leadership
      change in replicaManager (which is why replicaManager needs access to the
      produceRequestPurgatory). However, if we choose not to handle the leader
      change and just timeout, then we don't need that code in replicaManager and
      we can remove this logic as well.

      (ii) Passing in replicaManager - this is required because I need to get the
      inSyncReplicas for a partition, and the replicaManager maintains that.
      However, as described in (2) I think this can be directly accessed when I
      move this back to KafkaApis.

      > 13. Nice catch with the request size. I think it is okay that we are using
      > sizeInBytes, since it is not expected that the producer send fragments,
      > though a fully correct implementation would check that.

      That's a good point - using sizeInBytes is (more) correct then.

      > Also, another thought. This implementation is keeping a Map[(topic,
      > partition)] => # acks. I wonder if it would be better to do this in terms
      > of the acknowledged hw. Basically keep a global structure as part of
      > replica manager that tracks the position of all replicas for all
      > partitions. Then the isSatisfied topic would just check this structure to
      > see if the replicas had advanced far enough. This seems like it would not
      > require a per-request map of state and would be less fragile. Somehow this
      > structure seems generally useful for monitoring and other purposes as
      > well.

      This is exactly how it is implemented now - let me know if I misunderstood
      your comment. The acksPending is a boolean. The actual ack count is
      determined from the acknowledged HW that the replicaManager keeps track of.
      The map is still required to maintain other information such as the error
      code (e.g., if there was an error in writing to the log), the offset for
      each partition that followers need to fetch beyond for this request to be
      unblocked, and if there are any acks pending (so we don't bother re-counting
      unnecessarily).

      Show
      Joel Koshy added a comment - Jun/Jay, thanks a lot for the very helpful reviews. Here are follow-ups to some of your comments - just wanted to make sure I fully address them in v2, so let me know if I missed anything. One major thing to decide is what to do on a leader change during a delayedProduce - should it be handled immediately or let the full request timeout? > 1. TopicData.partitionDatas: data is the plural form of datum. So datas > feels weird. How about partitionDataArray? That works - no wonder 'datas' sounded odd. > 3. DelayedProduce: > 3.1 isSatisfied(): need to handle the case when requiredAcks is default > (probably any value <0). This means whenever we get the ack from every > replica in the current ISR, the request is satisfied. This can be done by > simply making sure leader.HW >= > delayedProduce.partitionStatus(followerFetchPartition).localOffset. Thanks for pointing that out - forgot to handle < 0. > 3.4 We need to be aware that isSatisfied can be called concurrently on the > same DelayedProduce object. I am not sure if it's really necessary, but > maybe we should consider using AtomicBoolean for > PartitionStatus.acksPending? I don't think isSatisfied can be called concurrently on the same DelayedProduce - the ProduceRequestPurgatory's watcher map is from key -> Watchers. The isSatisfied method is synchronized (through collectSatisfiedRequests) on Watchers. I do access partitionStatus for respond(), but that's after the delayedProduce has been kicked out of the purgatory. Or am I missing some other scenario? If so, can you elaborate? > 4. ProducerRequest: actTimeoutSecs should be actTimeoutMs. Forgot to bring this up in the v1 patch - it was simply named ackTimeout and treated as seconds (i.e., the producer request object uses an Int instead of Long) - that's why I added the secs suffix and left it as Int. Will switch it to use long and ms instead. > 5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory > when there is a leadership change, since the logic is simpler. We just let > the producer requests timeout. If we do want to do the check here, we > should share the ProducerRequestPurgatory object used in KafkaApis. Ok - that's also fine, and the code comment indicated it was optional. Not doing the check should help significantly with the code structure. Right now there is a circular dependency which forced me to put DelayedProduce and ProduceRequestPurgatory outside KafkaApis and pass in KafkaApis to DelayedProduce - can probably lay it out differently if we want to do this check later, but for now this makes things easier. > 6. SyncProducerConfigShared: We should probably make the default > requiredAcks to be -1 (i.e., wait for ack from each replica in ISR). Ok - either way sounds good. The existing default is (almost) the same behavior as 0.7 except that the producer response is sent after the message is persisted at the leader. > 1. There is a race condition between adding to the purgatory and > acknowledgement, I think. We first produce locally and then add the > watcher, which allows a window where the message is available for > replication with no watcher on ack. If a replica acknowledges a given > produce before the DelayedProduce is added as a watcher that will not be > recorded, I think, and the request will potentially timeout waiting for > ISR acks. This would certainly happen and lead to sporadic timeouts. This > is tricky, let's discuss. One solution would be to reverse the logic--add > the watcher first then produce it locally, but this means having to handle > the case where the local produce fails (we don't want to leak watchers). > The race condition that Jay raised seems like a potential issue. However, > it may be tricky to add the watcher before writing to the local log in the > leader. This is because the watcher needs to know the end offset of > messages in the leader's log. Another way to address the race condition is > to still add messages to leader's log first, followed by adding the > watcher. However, after the watcher is added, we can explicitly trigger a > watcher check by calling ProducerRequestPurgatory.update. Excellent points - will think about how best to handle this. > 2. As you say, you definitely can't pass KafkaApis to anything, making > DelayedProduceReuqest an inner class of KafkaApis will fix this, I think. Right - that's actually what I started out with (i.e., the purgatory and delayed request inside kafkaApis), but as I described above, there is a circular dependency and I was forced to move it out. However, the conclusion seems to be that we can do away with purgatory update on becoming follower so we can break this dependency loop. > 3. I think the way we are tying the api handling and replica manager is > not good. replica manager shouldn't know about DelayedProduce requests if > possible, it should only know about replica management and be oblivous to > the api level. Covered above (2). > 4. Why is the ack timeout in seconds? It should be milliseconds like > everything else, right? One would commonly want a sub-second timeout. Yes - it should be ms. (described above) > 5. As a stylistic thing it would be good to avoid direct use of > Tuple2 [A,B] and instead use (A,B) (e.g. myMethod: List [(A,B)] instead of > myMethod: List[Tuple2 [A,B] ] Sounds good. > 6. I don't understand why we are passing (topic, partition) to the > purgatory as BOTH the key and request. This is a bit hacky since this is > not the request. Let's discuss, we may need to clean up the purgatory api > to make it gracefully handle this case. It does seem a bit unintuitive on the surface, but I think it is correct: i.e., the produce request is to a set of (topic, partition) and potentially unblocks DelayedFetch requests to those (topic, partition). > 7. I think the ProducerRequestPurgatory should be instantiated in > KafkaApis not KafkaServer--it is an implementation detail of the api layer > not a major subsystem. Covered above (2). > 8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a > little bit tricky could you clean that up or if that is not possible > comment what it does (I think it just makes a list of (topic, partition) > pairs, but the variable name doesn't explain it). Same with > KafkaApis.handleFetchRequest. That is correct - will rename/comment. 9. DelayedProduce should definitely not be in the kafka.api package, that package is for request/response "structs". DelayedProduce is basically a bunch of server internals and not something the producer or consumer should be aware of. Likewise, the code for ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but these things are mirror images of one another. FetchRequestPurgatory is in KafkaApis, ProduceRequestPurgatory in its own file. These should match each other. The reasoning behind keeping things in KafkaApis was that those classes contained part of the logic for processing a request, so splitting it into two files makes it harder to read (the counter argument is that KafkaApis is getting pretty big). The other reason was so it could access the variables in KafkaApis as a nested class. Either way is probably fine, but they should be symetric between the two. Covered above (2). > 11. I am not so sure about the logic of proactively responding to all > outstanding requests with failure during a leadership change. Is this the > right thing to do? Related to (2) above - we can discuss this more. The benefit of doing this is that a DelayedProduce can be unblocked sooner (if the timeout is high) and has a more accurate error code - i.e., the produce request failed for this partition because its leader changed. However, not doing anything and throwing a timeout is also fine - we just need to decide on one or the other and document it for the producer API. > 12. The logic in DelayedProduce.isSatisfied is very odd. I think what we > want to do is respond to all outstanding requests in the event of a > leadership change. But we do this by passing in the ReplicaManager and > having logic that keys off whether or not the local broker is the leader. > This seems quite convoluted. Wouldn't it make more sense to do this: (1) > Have the replica manager allow "subscribers" to its state changes. (2) Add > a subscriber that loops over all in-flight requests and expires them with > an error when the local broker ceases to be the leader. There is a > description of this pattern here http://codingjunkie.net/guava-eventbus/ . I think there are two concerns you raise here: Responding to requests in the event of a leadership change: This was intended as a safety net - the existing patch deals with the leadership change in replicaManager (which is why replicaManager needs access to the produceRequestPurgatory). However, if we choose not to handle the leader change and just timeout, then we don't need that code in replicaManager and we can remove this logic as well. (ii) Passing in replicaManager - this is required because I need to get the inSyncReplicas for a partition, and the replicaManager maintains that. However, as described in (2) I think this can be directly accessed when I move this back to KafkaApis. > 13. Nice catch with the request size. I think it is okay that we are using > sizeInBytes, since it is not expected that the producer send fragments, > though a fully correct implementation would check that. That's a good point - using sizeInBytes is (more) correct then. > Also, another thought. This implementation is keeping a Map[(topic, > partition)] => # acks. I wonder if it would be better to do this in terms > of the acknowledged hw. Basically keep a global structure as part of > replica manager that tracks the position of all replicas for all > partitions. Then the isSatisfied topic would just check this structure to > see if the replicas had advanced far enough. This seems like it would not > require a per-request map of state and would be less fragile. Somehow this > structure seems generally useful for monitoring and other purposes as > well. This is exactly how it is implemented now - let me know if I misunderstood your comment. The acksPending is a boolean. The actual ack count is determined from the acknowledged HW that the replicaManager keeps track of. The map is still required to maintain other information such as the error code (e.g., if there was an error in writing to the log), the offset for each partition that followers need to fetch beyond for this request to be unblocked, and if there are any acks pending (so we don't bother re-counting unnecessarily).
      Hide
      Jay Kreps added a comment -

      Also, another thought. This implementation is keeping a Map[(topic, partition)] => # acks. I wonder if it would be better to do this in terms of the acknowledged hw. Basically keep a global structure as part of replica manager that tracks the position of all replicas for all partitions. Then the isSatisfied topic would just check this structure to see if the replicas had advanced far enough. This seems like it would not require a per-request map of state and would be less fragile. Somehow this structure seems generally useful for monitoring and other purposes as well.

      Show
      Jay Kreps added a comment - Also, another thought. This implementation is keeping a Map [(topic, partition)] => # acks. I wonder if it would be better to do this in terms of the acknowledged hw. Basically keep a global structure as part of replica manager that tracks the position of all replicas for all partitions. Then the isSatisfied topic would just check this structure to see if the replicas had advanced far enough. This seems like it would not require a per-request map of state and would be less fragile. Somehow this structure seems generally useful for monitoring and other purposes as well.
      Hide
      Jun Rao added a comment -

      The race condition that Jay raised seems like a potential issue. However, it may be tricky to add the watcher before writing to the local log in the leader. This is because the watcher needs to know the end offset of messages in the leader's log. Another way to address the race condition is to still add messages to leader's log first, followed by adding the watcher. However, after the watcher is added, we can explicitly trigger a watcher check by calling ProducerRequestPurgatory.update.

      Show
      Jun Rao added a comment - The race condition that Jay raised seems like a potential issue. However, it may be tricky to add the watcher before writing to the local log in the leader. This is because the watcher needs to know the end offset of messages in the leader's log. Another way to address the race condition is to still add messages to leader's log first, followed by adding the watcher. However, after the watcher is added, we can explicitly trigger a watcher check by calling ProducerRequestPurgatory.update.
      Hide
      Jay Kreps added a comment -

      This doesn't seem to apply cleanly on 0.8, I get conflicts on ReplicaFetcherThread and ReplicaManager.

      A few comments:
      1. There is a race condition between adding to the purgatory and acknowledgement, I think. We first produce locally and then add the watcher, which allows a window where the message is available for replication with no watcher on ack. If a replica acknowledges a given produce before the DelayedProduce is added as a watcher that will not be recorded, I think, and the request will potentially timeout waiting for ISR acks. This would certainly happen and lead to sporadic timeouts. This is tricky, let's discuss. One solution would be to reverse the logic--add the watcher first then produce it locally, but this means having to handle the case where the local produce fails (we don't want to leak watchers).
      2. As you say, you definitely can't pass KafkaApis to anything, making DelayedProduceReuqest an inner class of KafkaApis will fix this, I think.
      3. I think the way we are tying the api handling and replica manager is not good. replica manager shouldn't know about DelayedProduce requests if possible, it should only know about replica management and be oblivous to the api level.
      4. Why is the ack timeout in seconds? It should be milliseconds like everything else, right? One would commonly want a sub-second timeout.
      5. As a stylistic thing it would be good to avoid direct use of Tuple2[A,B] and instead use (A,B) (e.g. myMethod: List[(A,B)] instead of myMethod: List[Tuple2[A,B]]
      6. I don't understand why we are passing (topic, partition) to the purgatory as BOTH the key and request. This is a bit hacky since this is not the request. Let's discuss, we may need to clean up the purgatory api to make it gracefully handle this case.
      7. I think the ProducerRequestPurgatory should be instantiated in KafkaApis not KafkaServer--it is an implementation detail of the api layer not a major subsystem.
      8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a little bit tricky could you clean that up or if that is not possible comment what it does (I think it just makes a list of (topic, partition) pairs, but the variable name doesn't explain it). Same with KafkaApis.handleFetchRequest.
      9. DelayedProduce should definitely not be in the kafka.api package, that package is for request/response "structs". DelayedProduce is basically a bunch of server internals and not something the producer or consumer should be aware of. Likewise, the code for ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but these things are mirror images of one another. FetchRequestPurgatory is in KafkaApis, ProduceRequestPurgatory in its own file. These should match each other. The reasoning behind keeping things in KafkaApis was that those classes contained part of the logic for processing a request, so splitting it into two files makes it harder to read (the counter argument is that KafkaApis is getting pretty big). The other reason was so it could access the variables in KafkaApis as a nested class. Either way is probably fine, but they should be symetric between the two.
      10. KafkaApis.handleProducerRequest and KafkaApis.produce are a little hard to differentiate. I think the later is effectively "produce to local log" and the former does the purgatory stuff. Would be good to call this out in the method name or comment.
      11. I am not so sure about the logic of proactively responding to all outstanding requests with failure during a leadership change. Is this the right thing to do?
      12. The logic in DelayedProduce.isSatisfied is very odd. I think what we want to do is respond to all outstanding requests in the event of a leadership change. But we do this by passing in the ReplicaManager and having logic that keys off whether or not the local broker is the leader. This seems quite convoluted. Wouldn't it make more sense to do this: (1) Have the replica manager allow "subscribers" to its state changes. (2) Add a subscriber that loops over all in-flight requests and expires them with an error when the local broker ceases to be the leader. There is a description of this pattern here http://codingjunkie.net/guava-eventbus/.
      13. Nice catch with the request size. I think it is okay that we are using sizeInBytes, since it is not expected that the producer send fragments, though a fully correct implementation would check that.

      Show
      Jay Kreps added a comment - This doesn't seem to apply cleanly on 0.8, I get conflicts on ReplicaFetcherThread and ReplicaManager. A few comments: 1. There is a race condition between adding to the purgatory and acknowledgement, I think. We first produce locally and then add the watcher, which allows a window where the message is available for replication with no watcher on ack. If a replica acknowledges a given produce before the DelayedProduce is added as a watcher that will not be recorded, I think, and the request will potentially timeout waiting for ISR acks. This would certainly happen and lead to sporadic timeouts. This is tricky, let's discuss. One solution would be to reverse the logic--add the watcher first then produce it locally, but this means having to handle the case where the local produce fails (we don't want to leak watchers). 2. As you say, you definitely can't pass KafkaApis to anything, making DelayedProduceReuqest an inner class of KafkaApis will fix this, I think. 3. I think the way we are tying the api handling and replica manager is not good. replica manager shouldn't know about DelayedProduce requests if possible, it should only know about replica management and be oblivous to the api level. 4. Why is the ack timeout in seconds? It should be milliseconds like everything else, right? One would commonly want a sub-second timeout. 5. As a stylistic thing it would be good to avoid direct use of Tuple2 [A,B] and instead use (A,B) (e.g. myMethod: List [(A,B)] instead of myMethod: List[Tuple2 [A,B] ] 6. I don't understand why we are passing (topic, partition) to the purgatory as BOTH the key and request. This is a bit hacky since this is not the request. Let's discuss, we may need to clean up the purgatory api to make it gracefully handle this case. 7. I think the ProducerRequestPurgatory should be instantiated in KafkaApis not KafkaServer--it is an implementation detail of the api layer not a major subsystem. 8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a little bit tricky could you clean that up or if that is not possible comment what it does (I think it just makes a list of (topic, partition) pairs, but the variable name doesn't explain it). Same with KafkaApis.handleFetchRequest. 9. DelayedProduce should definitely not be in the kafka.api package, that package is for request/response "structs". DelayedProduce is basically a bunch of server internals and not something the producer or consumer should be aware of. Likewise, the code for ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but these things are mirror images of one another. FetchRequestPurgatory is in KafkaApis, ProduceRequestPurgatory in its own file. These should match each other. The reasoning behind keeping things in KafkaApis was that those classes contained part of the logic for processing a request, so splitting it into two files makes it harder to read (the counter argument is that KafkaApis is getting pretty big). The other reason was so it could access the variables in KafkaApis as a nested class. Either way is probably fine, but they should be symetric between the two. 10. KafkaApis.handleProducerRequest and KafkaApis.produce are a little hard to differentiate. I think the later is effectively "produce to local log" and the former does the purgatory stuff. Would be good to call this out in the method name or comment. 11. I am not so sure about the logic of proactively responding to all outstanding requests with failure during a leadership change. Is this the right thing to do? 12. The logic in DelayedProduce.isSatisfied is very odd. I think what we want to do is respond to all outstanding requests in the event of a leadership change. But we do this by passing in the ReplicaManager and having logic that keys off whether or not the local broker is the leader. This seems quite convoluted. Wouldn't it make more sense to do this: (1) Have the replica manager allow "subscribers" to its state changes. (2) Add a subscriber that loops over all in-flight requests and expires them with an error when the local broker ceases to be the leader. There is a description of this pattern here http://codingjunkie.net/guava-eventbus/ . 13. Nice catch with the request size. I think it is okay that we are using sizeInBytes, since it is not expected that the producer send fragments, though a fully correct implementation would check that.
      Hide
      Jun Rao added a comment -

      The suggestions for future improvements all make sense. We can create new jiras to track them.

      Show
      Jun Rao added a comment - The suggestions for future improvements all make sense. We can create new jiras to track them.
      Hide
      Jun Rao added a comment -

      Thanks for patch v1. Looks good overall. Some comments:

      1. TopicData.partitionDatas: data is the plural form of datum. So datas feels weird. How about partitionDataArray?

      2. KafkaApis:
      2.1 maybeUnblockDelayedRequests: put fetchRequestPurgatory.update in 1 line
      2.2 handleFetchRequest:
      For the following line:
      if(fetchRequest.replicaId != -1) {
      Instead of using -1 , could we create a constant like NoneFollowerReplicaId?
      2.3 handleFetchRequest: When creating responses, we should fill in the elapsed time instead of passing in -1. Note that the elapsed time is in nano-secs. So, we probably should rename Response.elapsed to sth like elapsesNs. Ditto for handleProduceRequest.

      3. DelayedProduce:
      3.1 isSatisfied(): need to handle the case when requiredAcks is default (probably any value <0). This means whenever we get the ack from every replica in the current ISR, the request is satisfied. This can be done by simply making sure leader.HW >= delayedProduce.partitionStatus(followerFetchPartition).localOffset.
      3.2 Could we change localOffsets to sth like requiredOffsets?
      3.3 respond(): need to compute elapse time in Response
      3.4 We need to be aware that isSatisfied can be called concurrently on the same DelayedProduce object. I am not sure if it's really necessary, but maybe we should consider using AtomicBoolean for PartitionStatus.acksPending?

      4. ProducerRequest: actTimeoutSecs should be actTimeoutMs.

      5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory when there is a leadership change, since the logic is simpler. We just let the producer requests timeout. If we do want to do the check here, we should share the ProducerRequestPurgatory object used in KafkaApis.

      6. SyncProducerConfigShared: We should probably make the default requiredAcks to be -1 (i.e., wait for ack from each replica in ISR).

      7. TopicMetadataTest: no need to change

      8. 0.8 has moved. So need to rebase

      Show
      Jun Rao added a comment - Thanks for patch v1. Looks good overall. Some comments: 1. TopicData.partitionDatas: data is the plural form of datum. So datas feels weird. How about partitionDataArray? 2. KafkaApis: 2.1 maybeUnblockDelayedRequests: put fetchRequestPurgatory.update in 1 line 2.2 handleFetchRequest: For the following line: if(fetchRequest.replicaId != -1) { Instead of using -1 , could we create a constant like NoneFollowerReplicaId? 2.3 handleFetchRequest: When creating responses, we should fill in the elapsed time instead of passing in -1. Note that the elapsed time is in nano-secs. So, we probably should rename Response.elapsed to sth like elapsesNs. Ditto for handleProduceRequest. 3. DelayedProduce: 3.1 isSatisfied(): need to handle the case when requiredAcks is default (probably any value <0). This means whenever we get the ack from every replica in the current ISR, the request is satisfied. This can be done by simply making sure leader.HW >= delayedProduce.partitionStatus(followerFetchPartition).localOffset. 3.2 Could we change localOffsets to sth like requiredOffsets? 3.3 respond(): need to compute elapse time in Response 3.4 We need to be aware that isSatisfied can be called concurrently on the same DelayedProduce object. I am not sure if it's really necessary, but maybe we should consider using AtomicBoolean for PartitionStatus.acksPending? 4. ProducerRequest: actTimeoutSecs should be actTimeoutMs. 5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory when there is a leadership change, since the logic is simpler. We just let the producer requests timeout. If we do want to do the check here, we should share the ProducerRequestPurgatory object used in KafkaApis. 6. SyncProducerConfigShared: We should probably make the default requiredAcks to be -1 (i.e., wait for ack from each replica in ISR). 7. TopicMetadataTest: no need to change 8. 0.8 has moved. So need to rebase
      Hide
      Joel Koshy added a comment -

      Patch is not too big, but a bit tricky. To help with review, here is an
      overview:

      • DelayedProduce:
      • Contains the logic to determine if it can be unblocked or not. The
        partitionStatus map is necessary to keep track of the local log's
        offset, error status, and whether acks are still pending. The comments
        in the code should make the remaining logic clear.
      • Handling delayed producer requests brings in dependencies on the fetch
        request purgatory, replica manager and KafkaZookeeper - which is why I
        had to pass in kafkaApis to the DelayedProduce which is a bit ugly.
      • KafkaApis:
      • The existing code for handling produce requests would respond when the
        leader persists the data to disk. We still do that if requiredAcks is 0
        or 1. For other values, we now create a DelayedProduce request which
        will be satisfied when requiredAcks followers are caught up. If the
        ISR < requiredAcks then the request will time out.
      • handleFetchRequest: if a request comes from a follower replica, check
        the produceRequestPurgatory and see if DelayedProduce requests can be
        unblocked.
      • ReplicaManager
      • only change is to try and unblock DelayedProduce requests to partitions
        for which the leader changed.

      Note that even if a request times out, some of the partitions may have been
      successfully acked - so even if one partition times out the global error
      code is NoError. The receiver must check the errors array to determine if
      there are any failures. I think this brings in the need for a
      "PartialSuccess" global error code in the ProducerResponse. Thoughts on
      this?

      I think there was a bug in checking satisfied DelayedFetchRequests: the
      checkSatisfied method would take a produceRequest's TopicData and see if the
      total number of bytes in that produce request could satisfy the remaining
      bytes to be filled in the DelayedFetchRequest. However, that could count
      data for partitions that were not part of the DelayedFetchRequest. This
      patch fixes that issue as well - changed the FetchRequestPurgatory key from
      TopicData to PartitionData and check on a per-partition basis.

      Another potential issue is that the DelayedFetchRequest satisfied counts
      using MessageSet's sizeInBytes, which could include incomplete messages - as
      opposed to iterating over the valid messages and getting the size. I left
      that as is. I think it is debatable which approach is correct in this case.

      I added some trace logs at individual request level - e.g., "Produce request
      to topic unblocked n delayedfetchrequests". These would be more useful if we
      add a uuid to each produce request - I think this idea was tossed around on
      the mailing list sometime before. Doesn't even have to be uuid or part of
      the produceRequest's wire format - even an atomicLong counter (internal to
      the broker) may be helpful- thoughts?

      There is this corner case that I think is handled correctly but want to make
      sure:

      • leader receives a producer request and adds it to the
        ProduceRequestPurgatory.
      • leadership changes while it is pending, so the error code for that
        partition is set to NotLeaderErrorForPartitionCode
      • leadership changes back to this broker while the DelayedProduce is
        pending.
      • In this scenario, the partition remains in the error state.
      • I think it is correct because the leader would have become a follower
        (before it became a leader again), and would have truncated its log to the
        intermediate leader's HW.

      If requiredAcks == |ISR| and the |ISR| shrinks while the DelayedProduce is
      pending, the request may timeout. However, if the |ISR| expands back to its
      original size while it is still pending it will get satisfied.

      Let me know if you can think of other corner cases that need to be
      considered - I wouldn't be surprised if there are quite a few.

      I only did limited testing with the ProducerTest.

      I think this opens up the following future work (separate jiras):

      • Enhance system test to test all corner cases (leader changing while
        request pending, ISR shrinking while request pending, etc.
      • ProducerResponse currently uses two separate arrays for errors and
        offsets; and ProducerRequest uses as array of TopicData each of which
        contains an array of PartitionData. It may be a good idea to improve these
        classes to use maps/something else as I had to resort to using find and
        indexOf to locate partition-level data in the original request.
      • We should add some mbeans for request purgatory stats - avg. hold time,
        outstanding requests, etc.
      • We should try and get rid of sleeps and fix all intermittent test
        failures.

      If the above list sounds good I'll file the jiras.

      Show
      Joel Koshy added a comment - Patch is not too big, but a bit tricky. To help with review, here is an overview: DelayedProduce: Contains the logic to determine if it can be unblocked or not. The partitionStatus map is necessary to keep track of the local log's offset, error status, and whether acks are still pending. The comments in the code should make the remaining logic clear. Handling delayed producer requests brings in dependencies on the fetch request purgatory, replica manager and KafkaZookeeper - which is why I had to pass in kafkaApis to the DelayedProduce which is a bit ugly. KafkaApis: The existing code for handling produce requests would respond when the leader persists the data to disk. We still do that if requiredAcks is 0 or 1. For other values, we now create a DelayedProduce request which will be satisfied when requiredAcks followers are caught up. If the ISR < requiredAcks then the request will time out. handleFetchRequest: if a request comes from a follower replica, check the produceRequestPurgatory and see if DelayedProduce requests can be unblocked. ReplicaManager only change is to try and unblock DelayedProduce requests to partitions for which the leader changed. Note that even if a request times out, some of the partitions may have been successfully acked - so even if one partition times out the global error code is NoError. The receiver must check the errors array to determine if there are any failures. I think this brings in the need for a "PartialSuccess" global error code in the ProducerResponse. Thoughts on this? I think there was a bug in checking satisfied DelayedFetchRequests: the checkSatisfied method would take a produceRequest's TopicData and see if the total number of bytes in that produce request could satisfy the remaining bytes to be filled in the DelayedFetchRequest. However, that could count data for partitions that were not part of the DelayedFetchRequest. This patch fixes that issue as well - changed the FetchRequestPurgatory key from TopicData to PartitionData and check on a per-partition basis. Another potential issue is that the DelayedFetchRequest satisfied counts using MessageSet's sizeInBytes, which could include incomplete messages - as opposed to iterating over the valid messages and getting the size. I left that as is. I think it is debatable which approach is correct in this case. I added some trace logs at individual request level - e.g., "Produce request to topic unblocked n delayedfetchrequests". These would be more useful if we add a uuid to each produce request - I think this idea was tossed around on the mailing list sometime before. Doesn't even have to be uuid or part of the produceRequest's wire format - even an atomicLong counter (internal to the broker) may be helpful- thoughts? There is this corner case that I think is handled correctly but want to make sure: leader receives a producer request and adds it to the ProduceRequestPurgatory. leadership changes while it is pending, so the error code for that partition is set to NotLeaderErrorForPartitionCode leadership changes back to this broker while the DelayedProduce is pending. In this scenario, the partition remains in the error state. I think it is correct because the leader would have become a follower (before it became a leader again), and would have truncated its log to the intermediate leader's HW. If requiredAcks == |ISR| and the |ISR| shrinks while the DelayedProduce is pending, the request may timeout. However, if the |ISR| expands back to its original size while it is still pending it will get satisfied. Let me know if you can think of other corner cases that need to be considered - I wouldn't be surprised if there are quite a few. I only did limited testing with the ProducerTest. I think this opens up the following future work (separate jiras): Enhance system test to test all corner cases (leader changing while request pending, ISR shrinking while request pending, etc. ProducerResponse currently uses two separate arrays for errors and offsets; and ProducerRequest uses as array of TopicData each of which contains an array of PartitionData. It may be a good idea to improve these classes to use maps/something else as I had to resort to using find and indexOf to locate partition-level data in the original request. We should add some mbeans for request purgatory stats - avg. hold time, outstanding requests, etc. We should try and get rid of sleeps and fix all intermittent test failures. If the above list sounds good I'll file the jiras.
      Hide
      Jun Rao added a comment -

      We need to support different levels of acks required by the producer. By default (ack=-1), the response to the producer request is only sent when every replica in ISR has received the message. If a producer specifies an ack >=0 (0 will be treated the same as 1), the response is sent after ack replicas have received the message. The caveat is that if the specified ack is less than ISR, the produced message could be lost if some broker fails.

      Show
      Jun Rao added a comment - We need to support different levels of acks required by the producer. By default (ack=-1), the response to the producer request is only sent when every replica in ISR has received the message. If a producer specifies an ack >=0 (0 will be treated the same as 1), the response is sent after ack replicas have received the message. The caveat is that if the specified ack is less than ISR, the produced message could be lost if some broker fails.

        People

        • Assignee:
          Joel Koshy
          Reporter:
          Jun Rao
        • Votes:
          0 Vote for this issue
          Watchers:
          3 Start watching this issue

          Dates

          • Created:
            Updated:
            Resolved:

            Development