Kafka
  1. Kafka
  2. KAFKA-1367

Broker topic metadata not kept in sync with ZooKeeper

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0, 0.8.1
    • Fix Version/s: 0.8.3
    • Component/s: None
    • Labels:

      Description

      When a broker is restarted, the topic metadata responses from the brokers will be incorrect (different from ZooKeeper) until a preferred replica leader election.

      In the metadata, it looks like leaders are correctly removed from the ISR when a broker disappears, but followers are not. Then, when a broker reappears, the ISR is never updated.

      I used a variation of the Vagrant setup created by Joe Stein to reproduce this with latest from the 0.8.1 branch: https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c

      1. KAFKA-1367_2015-07-01_17:23:14.patch
        15 kB
        Ashish K Singh
      2. KAFKA-1367_2015-07-06_22:04:06.patch
        16 kB
        Ashish K Singh
      3. KAFKA-1367.patch
        10 kB
        Ashish K Singh
      4. KAFKA-1367.txt
        3 kB
        Ryan Berdeen

        Issue Links

          Activity

          Hide
          Ryan Berdeen added a comment -

          JIRA ate my whitespace. See Attached for a description of the steps.

          Show
          Ryan Berdeen added a comment - JIRA ate my whitespace. See Attached for a description of the steps.
          Hide
          Andrew Otto added a comment -

          This happens to me as well. See: https://github.com/edenhill/librdkafka/issues/147

          Show
          Andrew Otto added a comment - This happens to me as well. See: https://github.com/edenhill/librdkafka/issues/147
          Hide
          Andrew Otto added a comment - - edited

          I just updated the librdkafka issue, pasting it here as well:

          I noticed that in my case, only 1 of the 4 brokers was ever missing in the ISRs reported by Kafka Brokers (via librdkafka). This JIRA indicated that a preferred-replica-election should fix the problem. I did this:

          controlled-shutdown of offending broker 21. Then actual shutdown of broker 21. Once this was done, librdkafka metadata showed the correct ISRs, since this offending broker really was not in any ISRs. I then restarted broker 21 and let its replicas catch back up. Once it caught up, zookeeper reported that all ISRs were in sync. I then checked librdkafka's metadata, and broker 21 was not listed in any ISR. I then ran a preferred-replica-election. broker 21 was then promoted to leader of some partitions. librdkafka then only showed broker 21 being in the ISRs for which it was also the leader. Any partition that has a replica on broker 21 does not show up in the ISR unless broker 21 is the leader.

          $ kafkacat -L -b analytics1022.eqiad.wmnet -t webrequest_upload
          Metadata for webrequest_upload (from broker -1: analytics1022.eqiad.wmnet:9092/bootstrap):
          4 brokers:
          broker 12 at analytics1012.eqiad.wmnet:9092
          broker 21 at analytics1021.eqiad.wmnet:9092
          broker 22 at analytics1022.eqiad.wmnet:9092
          broker 18 at analytics1018.eqiad.wmnet:9092
          1 topics:
          topic "webrequest_upload" with 12 partitions:
          partition 11, leader 12, replicas: 12,21,22, isrs: 12,22
          partition 5, leader 21, replicas: 21,22,12, isrs: 22,12,21
          partition 10, leader 22, replicas: 22,18,21, isrs: 18,22
          partition 7, leader 12, replicas: 12,18,21, isrs: 12,18
          partition 8, leader 18, replicas: 18,22,12, isrs: 12,18,22
          partition 3, leader 12, replicas: 12,22,18, isrs: 12,18,22
          partition 4, leader 18, replicas: 18,21,22, isrs: 18,22
          partition 1, leader 21, replicas: 21,18,22, isrs: 18,22,21
          partition 6, leader 22, replicas: 22,12,18, isrs: 12,18,22
          partition 2, leader 22, replicas: 22,21,12, isrs: 12,22
          partition 9, leader 21, replicas: 21,12,18, isrs: 12,18,21
          partition 0, leader 18, replicas: 18,12,21, isrs: 12,18

          vs kafka-topic.sh --describe

          Topic:webrequest_upload PartitionCount:12 ReplicationFactor:3 Configs:
          Topic: webrequest_upload Partition: 0 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21
          Topic: webrequest_upload Partition: 1 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21
          Topic: webrequest_upload Partition: 2 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21
          Topic: webrequest_upload Partition: 3 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22
          Topic: webrequest_upload Partition: 4 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21
          Topic: webrequest_upload Partition: 5 Leader: 21 Replicas: 21,22,12 Isr: 22,12,21
          Topic: webrequest_upload Partition: 6 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22
          Topic: webrequest_upload Partition: 7 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21
          Topic: webrequest_upload Partition: 8 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22
          Topic: webrequest_upload Partition: 9 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21
          Topic: webrequest_upload Partition: 10 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21
          Topic: webrequest_upload Partition: 11 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21

          Show
          Andrew Otto added a comment - - edited I just updated the librdkafka issue, pasting it here as well: I noticed that in my case, only 1 of the 4 brokers was ever missing in the ISRs reported by Kafka Brokers (via librdkafka). This JIRA indicated that a preferred-replica-election should fix the problem. I did this: controlled-shutdown of offending broker 21. Then actual shutdown of broker 21. Once this was done, librdkafka metadata showed the correct ISRs, since this offending broker really was not in any ISRs. I then restarted broker 21 and let its replicas catch back up. Once it caught up, zookeeper reported that all ISRs were in sync. I then checked librdkafka's metadata, and broker 21 was not listed in any ISR. I then ran a preferred-replica-election. broker 21 was then promoted to leader of some partitions. librdkafka then only showed broker 21 being in the ISRs for which it was also the leader. Any partition that has a replica on broker 21 does not show up in the ISR unless broker 21 is the leader. $ kafkacat -L -b analytics1022.eqiad.wmnet -t webrequest_upload Metadata for webrequest_upload (from broker -1: analytics1022.eqiad.wmnet:9092/bootstrap): 4 brokers: broker 12 at analytics1012.eqiad.wmnet:9092 broker 21 at analytics1021.eqiad.wmnet:9092 broker 22 at analytics1022.eqiad.wmnet:9092 broker 18 at analytics1018.eqiad.wmnet:9092 1 topics: topic "webrequest_upload" with 12 partitions: partition 11, leader 12, replicas: 12,21,22, isrs: 12,22 partition 5, leader 21, replicas: 21,22,12, isrs: 22,12,21 partition 10, leader 22, replicas: 22,18,21, isrs: 18,22 partition 7, leader 12, replicas: 12,18,21, isrs: 12,18 partition 8, leader 18, replicas: 18,22,12, isrs: 12,18,22 partition 3, leader 12, replicas: 12,22,18, isrs: 12,18,22 partition 4, leader 18, replicas: 18,21,22, isrs: 18,22 partition 1, leader 21, replicas: 21,18,22, isrs: 18,22,21 partition 6, leader 22, replicas: 22,12,18, isrs: 12,18,22 partition 2, leader 22, replicas: 22,21,12, isrs: 12,22 partition 9, leader 21, replicas: 21,12,18, isrs: 12,18,21 partition 0, leader 18, replicas: 18,12,21, isrs: 12,18 vs kafka-topic.sh --describe Topic:webrequest_upload PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_upload Partition: 0 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_upload Partition: 1 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic: webrequest_upload Partition: 2 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_upload Partition: 3 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_upload Partition: 4 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_upload Partition: 5 Leader: 21 Replicas: 21,22,12 Isr: 22,12,21 Topic: webrequest_upload Partition: 6 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_upload Partition: 7 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_upload Partition: 8 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_upload Partition: 9 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_upload Partition: 10 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_upload Partition: 11 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21
          Hide
          Jun Rao added a comment -

          Yes, in the current implementation, ISR returned from metadata requests can be inconsistent with what's stored in ZK. This is because metadata is only propagated from the controller to the brokers when the controller changes the leader or the ISR. However, when a follower catches up, the leader (not the controller) adds it back to ISR and updates ZK. That info is not propagated to all brokers.

          Currently, the ISR part in a metadata response is not really used by the clients. Do you have a usage for this?

          Show
          Jun Rao added a comment - Yes, in the current implementation, ISR returned from metadata requests can be inconsistent with what's stored in ZK. This is because metadata is only propagated from the controller to the brokers when the controller changes the leader or the ISR. However, when a follower catches up, the leader (not the controller) adds it back to ISR and updates ZK. That info is not propagated to all brokers. Currently, the ISR part in a metadata response is not really used by the clients. Do you have a usage for this?
          Hide
          Magnus Edenhill added a comment -

          Apart from supplying the ISR count and list in its metadata API, librdkafka also provides an `enforce.isr.cnt` configuration property that fails
          produce requests locally before transmission if the currently known ISR count is smaller than the configured value.
          This is a workaround for the broker not fully honoring `request.required.acks`, i.e., if `request.required.acks=3` and only one broker is available the produce request will not fail.
          More info in the original issue here: https://github.com/edenhill/librdkafka/issues/91

          Generally I would assume that information provided by the broker is correct, otherwise it should not be included at all since it can't be used (reliably).

          Show
          Magnus Edenhill added a comment - Apart from supplying the ISR count and list in its metadata API, librdkafka also provides an `enforce.isr.cnt` configuration property that fails produce requests locally before transmission if the currently known ISR count is smaller than the configured value. This is a workaround for the broker not fully honoring `request.required.acks`, i.e., if `request.required.acks=3` and only one broker is available the produce request will not fail. More info in the original issue here: https://github.com/edenhill/librdkafka/issues/91 Generally I would assume that information provided by the broker is correct, otherwise it should not be included at all since it can't be used (reliably).
          Hide
          Guozhang Wang added a comment -

          Hello Magnus Edenhill, I think your use case may be supported in a new feature that is currently developed: KAFKA-1555. Would you like to take a look at it and see if your case is really covered?

          Show
          Guozhang Wang added a comment - Hello Magnus Edenhill , I think your use case may be supported in a new feature that is currently developed: KAFKA-1555 . Would you like to take a look at it and see if your case is really covered?
          Hide
          Dan Hoffman added a comment -

          I'd also add that having the broker being able to serve up (accurate) metadata allows client applications to build custom dashboards, etc. As I understand it, there is the idea to move away from zookeeper (or at least for somethings) within the kafka infrastructure - so having the broker be able to provide this would be good.

          Show
          Dan Hoffman added a comment - I'd also add that having the broker being able to serve up (accurate) metadata allows client applications to build custom dashboards, etc. As I understand it, there is the idea to move away from zookeeper (or at least for somethings) within the kafka infrastructure - so having the broker be able to provide this would be good.
          Hide
          Joel Koshy added a comment -

          I definitely agree with Magnus Edenhill that if such a field exists in the response then the information populated in the field should be accurate (or we may as well not include the field) - so we should fix this.

          Show
          Joel Koshy added a comment - I definitely agree with Magnus Edenhill that if such a field exists in the response then the information populated in the field should be accurate (or we may as well not include the field) - so we should fix this.
          Hide
          Neha Narkhede added a comment -

          I definitely agree with Magnus Edenhill that if such a field exists in the response then the information populated in the field should be accurate (or we may as well not include the field) - so we should fix this.

          +1

          Show
          Neha Narkhede added a comment - I definitely agree with Magnus Edenhill that if such a field exists in the response then the information populated in the field should be accurate (or we may as well not include the field) - so we should fix this. +1
          Hide
          Magnus Edenhill added a comment -

          Guozhang Wang Yes, KAFKA-1555 looks like a good match.

          Show
          Magnus Edenhill added a comment - Guozhang Wang Yes, KAFKA-1555 looks like a good match.
          Hide
          Guozhang Wang added a comment -

          Regarding the fix to this issue, we can either 1) remove the ISR field from the metadata response and hence enforce people to use the admin tool (with ZK dependency) for such usages, which would also require a protocol change between client / server; or 2) let the controller to also watch for ISR change and propagate that information to brokers, this will not introduce protocol change to clients but will likely add a lot of burden on controllers since ISR change is more frequent than leader migrations.

          Joel KoshyJun Rao any other thoughts?

          Show
          Guozhang Wang added a comment - Regarding the fix to this issue, we can either 1) remove the ISR field from the metadata response and hence enforce people to use the admin tool (with ZK dependency) for such usages, which would also require a protocol change between client / server; or 2) let the controller to also watch for ISR change and propagate that information to brokers, this will not introduce protocol change to clients but will likely add a lot of burden on controllers since ISR change is more frequent than leader migrations. Joel Koshy Jun Rao any other thoughts?
          Hide
          Guozhang Wang added a comment -

          Just talked to Joel offline. I think since ISR (and also Leader) info in broker is just a cached snapshot and cannot be really used in a scenario like this (i.e. depending on the ISR list to determine if the ack received with -1 setting is reliable or not, since the ISR can shrink while the ack is sent back), we could remove the ISR cache from the brokers and also remove it from the metadata response, unless there is a clear use case of this information.

          Show
          Guozhang Wang added a comment - Just talked to Joel offline. I think since ISR (and also Leader) info in broker is just a cached snapshot and cannot be really used in a scenario like this (i.e. depending on the ISR list to determine if the ack received with -1 setting is reliable or not, since the ISR can shrink while the ack is sent back), we could remove the ISR cache from the brokers and also remove it from the metadata response, unless there is a clear use case of this information.
          Hide
          Neha Narkhede added a comment -

          The ISR cache on the broker was added only because we had to expose that information through the topic metadata response. I don't think we gave a lot of thought, back then, on why the ISR information is useful in the topic metadata response (especially since it's stale and effectively inaccurate). I am not entirely sure if having the controller be aware of all ISR changes is terrible even though it's true that the # of watches it has to add is proportional to the # of partitions in a cluster. But it's not worth doing that if we don't find a use for the ISR information in the topic metadata response. So I'd vote for removing ISR from topic metadata and also from the broker's metadata cache.

          Show
          Neha Narkhede added a comment - The ISR cache on the broker was added only because we had to expose that information through the topic metadata response. I don't think we gave a lot of thought, back then, on why the ISR information is useful in the topic metadata response (especially since it's stale and effectively inaccurate). I am not entirely sure if having the controller be aware of all ISR changes is terrible even though it's true that the # of watches it has to add is proportional to the # of partitions in a cluster. But it's not worth doing that if we don't find a use for the ISR information in the topic metadata response. So I'd vote for removing ISR from topic metadata and also from the broker's metadata cache.
          Hide
          Magnus Edenhill added a comment -

          May I suggest not to change the protocol but to only send an empty ISR vector in the MetadataResponse?

          Show
          Magnus Edenhill added a comment - May I suggest not to change the protocol but to only send an empty ISR vector in the MetadataResponse?
          Hide
          Francois Saint-Jacques added a comment -

          Instead of silently removing the field, could the controller force a cache refresh on a metadata request?

          Show
          Francois Saint-Jacques added a comment - Instead of silently removing the field, could the controller force a cache refresh on a metadata request?
          Hide
          Jay Kreps added a comment -

          Neha Narkhede Does this problem still exist?

          Show
          Jay Kreps added a comment - Neha Narkhede Does this problem still exist?
          Hide
          Michael Graff added a comment -

          There is another issue that no one seems to be discussing. This ISR data is AFAIK the only way to know when a broker is "Back in service"

          Consider this scenario. I have 5 brokers, and want to upgrade them. I want to know when a broker has caught up so I can take down the next one in sequence to upgrade it. How can I know this if the reported state of the world is different than what is actually in use by the brokers themselves?

          This seems to be very much an operational issue.

          Show
          Michael Graff added a comment - There is another issue that no one seems to be discussing. This ISR data is AFAIK the only way to know when a broker is "Back in service" Consider this scenario. I have 5 brokers, and want to upgrade them. I want to know when a broker has caught up so I can take down the next one in sequence to upgrade it. How can I know this if the reported state of the world is different than what is actually in use by the brokers themselves? This seems to be very much an operational issue.
          Hide
          Jun Rao added a comment -

          Michael Graff, for the operational issue that you pointed out, there are jmx such as the underReplicatedCount that one can leverage.

          That said, it's probably useful for an admin client to know the accurate ISR. As Guozhang has suggested, one approach is to register a watcher of the state path for each partition. We probably need to do a bit experiments to see how much overhead this adds. Another approach is to have the controller periodically read the latest ISR for each partition. This is probably a bit simpler to implement, but may not reflect ISR timely and may add unnecessary load to ZK.

          Show
          Jun Rao added a comment - Michael Graff , for the operational issue that you pointed out, there are jmx such as the underReplicatedCount that one can leverage. That said, it's probably useful for an admin client to know the accurate ISR. As Guozhang has suggested, one approach is to register a watcher of the state path for each partition. We probably need to do a bit experiments to see how much overhead this adds. Another approach is to have the controller periodically read the latest ISR for each partition. This is probably a bit simpler to implement, but may not reflect ISR timely and may add unnecessary load to ZK.
          Hide
          Jon Bringhurst added a comment -

          Hey Jay Kreps, I can confirm that the metadata response is still out of sync with ZK in a recent trunk build (about a month old).

          Btw, it's not much effort for me to simply look at ZK or JMX – it was just confusing when I initially ran into this.

          Show
          Jon Bringhurst added a comment - Hey Jay Kreps , I can confirm that the metadata response is still out of sync with ZK in a recent trunk build (about a month old). Btw, it's not much effort for me to simply look at ZK or JMX – it was just confusing when I initially ran into this.
          Hide
          Neha Narkhede added a comment -

          Jay Kreps Yup, issue still exists and the solution I still recommend is to have the controller register watches and know the latest ISR for all partitions. This change isn't big if someone wants to take a stab.

          Show
          Neha Narkhede added a comment - Jay Kreps Yup, issue still exists and the solution I still recommend is to have the controller register watches and know the latest ISR for all partitions. This change isn't big if someone wants to take a stab.
          Hide
          Ashish K Singh added a comment -

          Neha Narkhede If no one has already started working on this I would like to take a stab at it.

          Show
          Ashish K Singh added a comment - Neha Narkhede If no one has already started working on this I would like to take a stab at it.
          Hide
          Joel Koshy added a comment -

          I'm a bit wary of the watch approach. I believe at the last google hangout we decided against doing this (as part of the KIP-4 discussion). A number of people had dropped off at that point - I believe we were going with a broker-level metadata request that can return ISR information for partitions that it leads. Andrii Biletskyi can you confirm? I didn't see it in the summary notes so I could be wrong.

          Show
          Joel Koshy added a comment - I'm a bit wary of the watch approach. I believe at the last google hangout we decided against doing this (as part of the KIP-4 discussion). A number of people had dropped off at that point - I believe we were going with a broker-level metadata request that can return ISR information for partitions that it leads. Andrii Biletskyi can you confirm? I didn't see it in the summary notes so I could be wrong.
          Hide
          Andrii Biletskyi added a comment -

          Joel Koshy Yes, it appears that topic commands don't require ISR information so it was proposed to remove it at all from the TopicMetadataRequest. There was an idea to create some sort of BrokerMetadataRequest which will include correct topic metadata but since it's not related to KIP-4 directly it won't be a part of it. So everyone is welcome to create a separate KIP for it. Atleast to this conclusion we came last time.

          Show
          Andrii Biletskyi added a comment - Joel Koshy Yes, it appears that topic commands don't require ISR information so it was proposed to remove it at all from the TopicMetadataRequest. There was an idea to create some sort of BrokerMetadataRequest which will include correct topic metadata but since it's not related to KIP-4 directly it won't be a part of it. So everyone is welcome to create a separate KIP for it. Atleast to this conclusion we came last time.
          Hide
          Neha Narkhede added a comment -

          Joel Koshy That would work too but looks like Andrii Biletskyi is suggesting that it is not included as part of KIP-4. Maybe we can have whoever picks this JIRA discuss this change as part of a separate KIP?

          Show
          Neha Narkhede added a comment - Joel Koshy That would work too but looks like Andrii Biletskyi is suggesting that it is not included as part of KIP-4. Maybe we can have whoever picks this JIRA discuss this change as part of a separate KIP?
          Hide
          Ashish K Singh added a comment -

          Neha Narkhede/ Joel Koshy I have put together KIP-24. I will need a bit more information on what we decided for BrokerMetadataRequest, before I can update "Public Interfaces" section of the KIP.

          Is my understanding correct that below is the plan we agreed upon. Below excerpt is actually from the KIP.

          It is proposed to remove ISR information from the TopicMetadataRequest. However, a new request, BrokerMetadataRequest, is proposed to be added. The new request will include ISR information for all the partitions that the broker leads.

          Show
          Ashish K Singh added a comment - Neha Narkhede / Joel Koshy I have put together KIP-24 . I will need a bit more information on what we decided for BrokerMetadataRequest, before I can update "Public Interfaces" section of the KIP. Is my understanding correct that below is the plan we agreed upon. Below excerpt is actually from the KIP. It is proposed to remove ISR information from the TopicMetadataRequest. However, a new request, BrokerMetadataRequest, is proposed to be added. The new request will include ISR information for all the partitions that the broker leads.
          Hide
          Ashish K Singh added a comment -

          Joel Koshy pinging you for your confirmation on this.

          Show
          Ashish K Singh added a comment - Joel Koshy pinging you for your confirmation on this.
          Hide
          Joel Koshy added a comment -

          Hi Ashish K Singh - sorry I missed your pings. Yes that is the approach we are planning to take. i.e., remove ISR from TMR. As mentioned in KIP-4 the ISR will be removed in v1 of TMR.

          Show
          Joel Koshy added a comment - Hi Ashish K Singh - sorry I missed your pings. Yes that is the approach we are planning to take. i.e., remove ISR from TMR. As mentioned in KIP-4 the ISR will be removed in v1 of TMR.
          Hide
          Ashish K Singh added a comment -

          Thanks Joel Koshy! I will update the KIP accordingly.

          Show
          Ashish K Singh added a comment - Thanks Joel Koshy ! I will update the KIP accordingly.
          Hide
          Jun Rao added a comment -

          There is actually a reasonable use case of ISR in KAFKA-2225. Basically, for economical reasons, we may want to let a consumer fetch from a replica in ISR that's in the same zone. In order to support that, it will be convenient to have TMR return the correct ISR for the consumer to choose.

          Implementation wise, one way to address the concern with too many watchers is to do sth similar to changing topic configs. Basically, when the leader changes the isr, in addition to writing the new isr in the partition state in ZK, it also writes the change as a sequential node under a new isrChangeNotification path in ZK. The controller listens to child changes in the isrChangeNotification path. On child change, the controller reads the new isr and broadcasts it through an UpdateMetadataRequest to every broker.

          Show
          Jun Rao added a comment - There is actually a reasonable use case of ISR in KAFKA-2225 . Basically, for economical reasons, we may want to let a consumer fetch from a replica in ISR that's in the same zone. In order to support that, it will be convenient to have TMR return the correct ISR for the consumer to choose. Implementation wise, one way to address the concern with too many watchers is to do sth similar to changing topic configs. Basically, when the leader changes the isr, in addition to writing the new isr in the partition state in ZK, it also writes the change as a sequential node under a new isrChangeNotification path in ZK. The controller listens to child changes in the isrChangeNotification path. On child change, the controller reads the new isr and broadcasts it through an UpdateMetadataRequest to every broker.
          Hide
          Joel Koshy added a comment -

          Jun - that is a good point. That sounds like a good approach to address the concern with watcher counts. Another way is to just allow brokers to send an equivalent update metadata (or similar) request to the controller to notify it of an ISR change - or even allow leaders to broadcast update metadata requests for ISR changes. We currently don't allow this, but maybe we should consider a generic broker-broker communication component. Given the use-case that Theo raised on the list yesterday, it appears we may want to keep the ISR even in TMR v1. It may make sense to discuss this at an upcoming hangout especially since it affects KIP-4.

          Show
          Joel Koshy added a comment - Jun - that is a good point. That sounds like a good approach to address the concern with watcher counts. Another way is to just allow brokers to send an equivalent update metadata (or similar) request to the controller to notify it of an ISR change - or even allow leaders to broadcast update metadata requests for ISR changes. We currently don't allow this, but maybe we should consider a generic broker-broker communication component. Given the use-case that Theo raised on the list yesterday, it appears we may want to keep the ISR even in TMR v1. It may make sense to discuss this at an upcoming hangout especially since it affects KIP-4.
          Hide
          Jun Rao added a comment -

          It's probably better to always let the controller propagate metadata changes to the brokers. If the metadata change can be sent from both the controller and other brokers, we need additional logic to reason about ordering.

          Having the broker send the change to the controller is possible. The implication is that there is another thread that can exercise the controller logic, instead of just the ZK watcher thread. So, we may need to deal with more concurrency issues.

          Show
          Jun Rao added a comment - It's probably better to always let the controller propagate metadata changes to the brokers. If the metadata change can be sent from both the controller and other brokers, we need additional logic to reason about ordering. Having the broker send the change to the controller is possible. The implication is that there is another thread that can exercise the controller logic, instead of just the ZK watcher thread. So, we may need to deal with more concurrency issues.
          Hide
          Joel Koshy added a comment -

          One minor issue with depending on the TMR for KAKFA-2225 even if we fix this is that the consumer would need to periodically refresh its metadata in case the ISR changes after it starts reading from a follower in ISR.

          Another approach for KAFKA-2225 is to add the ISR information to the fetch response. The followers will then have the current ISR information and so will the consumers. There are at least two concerns though: first, it depends on a live replica fetcher thread; second, it's a bit hacky to add ISR to fetch response as it is more associated with metadata.

          Show
          Joel Koshy added a comment - One minor issue with depending on the TMR for KAKFA-2225 even if we fix this is that the consumer would need to periodically refresh its metadata in case the ISR changes after it starts reading from a follower in ISR. Another approach for KAFKA-2225 is to add the ISR information to the fetch response. The followers will then have the current ISR information and so will the consumers. There are at least two concerns though: first, it depends on a live replica fetcher thread; second, it's a bit hacky to add ISR to fetch response as it is more associated with metadata.
          Hide
          Jun Rao added a comment -

          Well, in that approach, you still have the problem on the very first fetch request. If ISR is not returned in TMR, the first fetch request has to go to the leader. Then the consumer has to switch to another broker on a subsequent request, which seems more complicated.

          I am not sure if we need to rely on periodic metadata refresh to detect whether a replica is out of sync. Basically, as long as the fetch offset is less than HW, the replica can serve the request. If the fetch offset is larger than HW (an indication that the replica is out of sync), the consumer will get an OffsetOutOfRangeException and has to refresh the metadata and pick a new broker to fetch from.

          Show
          Jun Rao added a comment - Well, in that approach, you still have the problem on the very first fetch request. If ISR is not returned in TMR, the first fetch request has to go to the leader. Then the consumer has to switch to another broker on a subsequent request, which seems more complicated. I am not sure if we need to rely on periodic metadata refresh to detect whether a replica is out of sync. Basically, as long as the fetch offset is less than HW, the replica can serve the request. If the fetch offset is larger than HW (an indication that the replica is out of sync), the consumer will get an OffsetOutOfRangeException and has to refresh the metadata and pick a new broker to fetch from.
          Hide
          Ashish K Singh added a comment -

          Jun Rao can we add this to the agenda of next KIP hangout?

          Show
          Ashish K Singh added a comment - Jun Rao can we add this to the agenda of next KIP hangout?
          Hide
          Jun Rao added a comment -

          Yes.

          Show
          Jun Rao added a comment - Yes.
          Hide
          Joel Koshy added a comment -

          You may still want to switch back to a replica in the same (or nearer) availability zone right after it catches up and rejoins the ISR. Also, I'm not sure about the offset going out of range while fetching from a given replica. i.e., the fetcher will just fetch with an offset larger than the last fetched chunk. It may occur if you were switching between replicas though but that would only be if you were switching to a replica out of the ISR.

          Show
          Joel Koshy added a comment - You may still want to switch back to a replica in the same (or nearer) availability zone right after it catches up and rejoins the ISR. Also, I'm not sure about the offset going out of range while fetching from a given replica. i.e., the fetcher will just fetch with an offset larger than the last fetched chunk. It may occur if you were switching between replicas though but that would only be if you were switching to a replica out of the ISR.
          Hide
          Jun Rao added a comment -

          [~joel koshy], yes, that's a good point. If we want to switch back to the closest replica for consumption, we do need to refresh the metadata periodically to check if the closest replica is back in ISR. We also need to handle OffsetOutOfRangeException a bit differently. If the consumer gets OffsetOutOfRangeException because the replica is out of sync, we want to switch to another in-sync replica instead of resetting the offset. One way to do that is the following protocol.

          1. Get topic metadata.
          2. Pick the "closest" in-sync replica to issue fetch requests.
          3. On an OffsetOutOfRangeException, get the smallest/largest offset. If the fetch offset is within the range, go back to step 1 to switch to a different in-sync replica. Otherwise, go through the offset reset logic.
          4. Periodically refresh the metadata. Switch to the "closest" in-sync replica for fetching, if needed.

          Show
          Jun Rao added a comment - [~joel koshy] , yes, that's a good point. If we want to switch back to the closest replica for consumption, we do need to refresh the metadata periodically to check if the closest replica is back in ISR. We also need to handle OffsetOutOfRangeException a bit differently. If the consumer gets OffsetOutOfRangeException because the replica is out of sync, we want to switch to another in-sync replica instead of resetting the offset. One way to do that is the following protocol. 1. Get topic metadata. 2. Pick the "closest" in-sync replica to issue fetch requests. 3. On an OffsetOutOfRangeException, get the smallest/largest offset. If the fetch offset is within the range, go back to step 1 to switch to a different in-sync replica. Otherwise, go through the offset reset logic. 4. Periodically refresh the metadata. Switch to the "closest" in-sync replica for fetching, if needed.
          Hide
          Guozhang Wang added a comment -

          One note is that with the new consumer, one has to periodically refresh metadata anyways for wildcard subscriptions.

          Show
          Guozhang Wang added a comment - One note is that with the new consumer, one has to periodically refresh metadata anyways for wildcard subscriptions.
          Hide
          Joel Koshy added a comment -

          Follow-up from the KIP hangout. Side-note: this and most of the above comments are actually implementation details for KAFKA-2225. This is relevant here only because we are considering keeping vs. removing the ISR field.

          I do think it is possible to implement KAFKA-2225 without ISR support either in metadata or the fetch response. The fetch response already contains HW. So the consumer can watch its fetch offset and the current HW (from the last fetch response). If the fetchOffset << HW but if the fetch response size is smaller than the requested bytes and the highest offset in the response is << HW then the consumer knows that the follower that it is fetching from is lagging behind (especially if this difference increases in successive fetches). The main caveat with this is that it depends on the replica having a live replica fetcher. The other issue is that the consumer needs to have its own definition of what it takes to deem a replica as out of sync (since the replica lag time config is server-side). The other observation is that ISR is a highly relevant and useful field in the topic metadata response. I would be in favor of keeping it in the TMR and just having the consumer refresh the topic metadata periodically to keep itself informed of ISR changes.

          Show
          Joel Koshy added a comment - Follow-up from the KIP hangout. Side-note: this and most of the above comments are actually implementation details for KAFKA-2225 . This is relevant here only because we are considering keeping vs. removing the ISR field. I do think it is possible to implement KAFKA-2225 without ISR support either in metadata or the fetch response. The fetch response already contains HW. So the consumer can watch its fetch offset and the current HW (from the last fetch response). If the fetchOffset << HW but if the fetch response size is smaller than the requested bytes and the highest offset in the response is << HW then the consumer knows that the follower that it is fetching from is lagging behind (especially if this difference increases in successive fetches). The main caveat with this is that it depends on the replica having a live replica fetcher. The other issue is that the consumer needs to have its own definition of what it takes to deem a replica as out of sync (since the replica lag time config is server-side). The other observation is that ISR is a highly relevant and useful field in the topic metadata response. I would be in favor of keeping it in the TMR and just having the consumer refresh the topic metadata periodically to keep itself informed of ISR changes.
          Hide
          Aditya Auradkar added a comment - - edited

          Joel Koshy Jun Rao KAFKA-2225, even if we leave the ISR in the TopicMetadataRequest, how do the consumers detect which of the replicas in ISR to fetch from right? The consumers need to know which "zone" each of the brokers live in and their own in order to fetch from the closest replica (which mitigates with the bandwidth issues described in 2225).

          Couple of options:
          1. Return it in BrokerMetadataRequest (KIP-24)
          2. Piggyback it along with the ISR field in TMR. i.e. isr :

          {0: "zone1", 1: "zone2"}

          If we choose to do (2), then the TMR will evolve anyway.

          Show
          Aditya Auradkar added a comment - - edited Joel Koshy Jun Rao KAFKA-2225 , even if we leave the ISR in the TopicMetadataRequest, how do the consumers detect which of the replicas in ISR to fetch from right? The consumers need to know which "zone" each of the brokers live in and their own in order to fetch from the closest replica (which mitigates with the bandwidth issues described in 2225). Couple of options: 1. Return it in BrokerMetadataRequest (KIP-24) 2. Piggyback it along with the ISR field in TMR. i.e. isr : {0: "zone1", 1: "zone2"} If we choose to do (2), then the TMR will evolve anyway.
          Hide
          Jun Rao added a comment -

          Yes, we need some kind of notion of zones for both the brokers and the clients. Each broker and each client (producer/consumer) need a configuration for which zone it belongs to. It's probably simpler to just return the zone info in TMR. We will need to evolve TMR, but that can probably be done separately from fixing the ISR in TMR. We probably should move these design discussions to KAFKA-2225 itself.

          Show
          Jun Rao added a comment - Yes, we need some kind of notion of zones for both the brokers and the clients. Each broker and each client (producer/consumer) need a configuration for which zone it belongs to. It's probably simpler to just return the zone info in TMR. We will need to evolve TMR, but that can probably be done separately from fixing the ISR in TMR. We probably should move these design discussions to KAFKA-2225 itself.
          Hide
          Gwen Shapira added a comment -

          By "zones" do we mean rack-awareness? Or more general locality notion?
          Sounds like something that may need its own JIRA and design.

          Show
          Gwen Shapira added a comment - By "zones" do we mean rack-awareness? Or more general locality notion? Sounds like something that may need its own JIRA and design.
          Hide
          Jiangjie Qin added a comment -

          I agree with Gwen Shapira, it sounds this deserves a KIP.

          Show
          Jiangjie Qin added a comment - I agree with Gwen Shapira , it sounds this deserves a KIP.
          Hide
          Jun Rao added a comment -

          Yes, perhaps some kind of more general locality could be useful. That can be done in a separate jira.

          Here, we just want to figure out whether it's useful to maintain ISR in TMR.

          Joel Koshy, another issue without ISR is that initially a client will have no idea which replica is in sync and can only guess.

          Show
          Jun Rao added a comment - Yes, perhaps some kind of more general locality could be useful. That can be done in a separate jira. Here, we just want to figure out whether it's useful to maintain ISR in TMR. Joel Koshy , another issue without ISR is that initially a client will have no idea which replica is in sync and can only guess.
          Hide
          Ashish K Singh added a comment -

          Jun Rao and Joel Koshy, correct me if my understanding is wrong, but I think we agreed on keeping ISR info in TMR and below mentioned approach is our preference.

          When the leader changes the isr, in addition to writing the new isr in the partition state in ZK, it also writes the change as a sequential node under a new isrChangeNotification path in ZK. The controller listens to child changes in the isrChangeNotification path. On child change, the controller reads the new isr and broadcasts it through an UpdateMetadataRequest to every broker.

          Now that we want to keep ISR as part of TMR, do we still need a new BrokerMetadataRequest?

          Show
          Ashish K Singh added a comment - Jun Rao and Joel Koshy , correct me if my understanding is wrong, but I think we agreed on keeping ISR info in TMR and below mentioned approach is our preference. When the leader changes the isr, in addition to writing the new isr in the partition state in ZK, it also writes the change as a sequential node under a new isrChangeNotification path in ZK. The controller listens to child changes in the isrChangeNotification path. On child change, the controller reads the new isr and broadcasts it through an UpdateMetadataRequest to every broker. Now that we want to keep ISR as part of TMR, do we still need a new BrokerMetadataRequest?
          Hide
          Joel Koshy added a comment -

          Ashish K Singh - yes that is a good summary. BrokerMetadataRequest - probably yes, but that is now orthogonal.

          Show
          Joel Koshy added a comment - Ashish K Singh - yes that is a good summary. BrokerMetadataRequest - probably yes, but that is now orthogonal.
          Hide
          Ashish K Singh added a comment -

          Joel Koshy thanks for confirming. I will get started on the suggested solution for this issue. We will probably need a separate JIRA for KIP-24.

          Show
          Ashish K Singh added a comment - Joel Koshy thanks for confirming. I will get started on the suggested solution for this issue. We will probably need a separate JIRA for KIP-24.
          Hide
          Ashish K Singh added a comment -

          Created reviewboard https://reviews.apache.org/r/35820/
          against branch trunk

          Show
          Ashish K Singh added a comment - Created reviewboard https://reviews.apache.org/r/35820/ against branch trunk
          Hide
          Ashish K Singh added a comment -

          Jun Rao, Joel Koshy, Neha Narkhede, Gwen Shapira just uploaded a patch to fix this. Apart from the changes suggested above, I just had to update controller's leader and isr cache before sending update metadata request. Tested it on a 3 node kafka cluster and the patch resolves the issue.

          Show
          Ashish K Singh added a comment - Jun Rao , Joel Koshy , Neha Narkhede , Gwen Shapira just uploaded a patch to fix this. Apart from the changes suggested above, I just had to update controller's leader and isr cache before sending update metadata request. Tested it on a 3 node kafka cluster and the patch resolves the issue.
          Hide
          Ashish K Singh added a comment -

          Updated reviewboard https://reviews.apache.org/r/35820/
          against branch trunk

          Show
          Ashish K Singh added a comment - Updated reviewboard https://reviews.apache.org/r/35820/ against branch trunk
          Hide
          Ashish K Singh added a comment -

          Updated reviewboard https://reviews.apache.org/r/35820/
          against branch trunk

          Show
          Ashish K Singh added a comment - Updated reviewboard https://reviews.apache.org/r/35820/ against branch trunk
          Hide
          Jun Rao added a comment -

          Thanks for the latest patch. +1. Committed to trunk after fixing the last minor issue in the RB.

          Show
          Jun Rao added a comment - Thanks for the latest patch. +1. Committed to trunk after fixing the last minor issue in the RB.

            People

            • Assignee:
              Ashish K Singh
              Reporter:
              Ryan Berdeen
            • Votes:
              5 Vote for this issue
              Watchers:
              25 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development