Kafka
  1. Kafka
  2. KAFKA-901

Kafka server can become unavailable if clients send several metadata requests

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: replication
    • Labels:
      None

      Description

      Currently, if a broker is bounced without controlled shutdown and there are several clients talking to the Kafka cluster, each of the clients realize the unavailability of leaders for some partitions. This leads to several metadata requests sent to the Kafka brokers. Since metadata requests are pretty slow, all the I/O threads quickly become busy serving the metadata requests. This leads to a full request queue, that stalls handling of finished responses since the same network thread handles requests as well as responses. In this situation, clients timeout on metadata requests and send more metadata requests. This quickly makes the Kafka cluster unavailable.

      1. metadata-request-improvement.patch
        91 kB
        Neha Narkhede
      2. kafka-901-v5.patch
        134 kB
        Neha Narkhede
      3. kafka-901-v4.patch
        133 kB
        Neha Narkhede
      4. kafka-901-v2.patch
        138 kB
        Neha Narkhede
      5. kafka-901-followup2.patch
        5 kB
        Neha Narkhede
      6. kafka-901-followup.patch
        4 kB
        Neha Narkhede
      7. kafka-901.patch
        117 kB
        Neha Narkhede

        Activity

        Neha Narkhede created issue -
        Neha Narkhede made changes -
        Field Original Value New Value
        Status Open [ 1 ] In Progress [ 3 ]
        Neha Narkhede made changes -
        Status In Progress [ 3 ] Patch Available [ 10002 ]
        Hide
        Neha Narkhede added a comment -

        This patch changes the way a Kafka server handles metadata requests. Metadata requests are currently a bottleneck in the system since the server reads several paths from zookeeper to serve one metadata request. The latency is also proportional to the number of topics in a metadata request. A faster way to serve metadata requests is through the controller. The reason is the controller makes all state change decisions for the cluster, so it has a cache with the latest leadership information. The patch uses the following algorithm to serve metadata requests -

        1. If broker is controller, read leadership information from cache and send the response
        2. If broker is not controller, instantiate a sync producer to forward metadata request to the controller
        3. If broker is not controller and a controller is unavailable, send ControllerNotAvailable error code back to the client

        There are some things to note here -

        1. How does a broker know who the current controller and its host/port ? If we read from zookeeper, then that makes metadata requests slow, although less slower than before. However, upon controller failover, the controller sends a LeaderAndIsr request to all* brokers. So when a broker receives any state change request from the controller, it stores the controller's host/port in cache from zookeeper. Since state change requests are rare, this is ok

        Now there is a corner case we need to take care of. If a new broker is brought up and it doesn't have any partitions assigned to it, it won't receive any LeaderAndIsr request in the current code base. This patch takes care of this by changing controller to always send leader and isr request to newly restarted brokers, even if there are no partitions in the request.

        2. What timeout should be used when a broker wants to forward a metadata request to the controller ?
        Since there isn't a way to know the timeout specified by the original metadata request, we can potentially set the socket timeout for the forwarded request to be Integer.MAX. This will ensure that the forwarded request will not prematurely time out. However, we need to ensure that the controller always sends a response back to the broker OR closes the socket. This will prevent the broker's I/O thread from waiting indefinitely for a response from the controller. This also requires the controller to not block when serving metadata from its cache OR it will block some other broker's I/O thread, which is bad.

        3. On the controller, should it acquire a lock when reading its cache to serve a metadata request ?

        This is not required and is potentially dangerous. It is not required since even if the controller's cache information is undergoing change and we send stale information to the client, it will get an error and retry. Eventually, the cache will be consistent and the accurate metadata will be sent to the client. This is also ok since changes to the controller's cache are relatively rare so chances of stale metadata are low.

        4. Do we need to make the timeout for the forwarded metadata request configurable ?

        Ideally no, except for unit tests. The reason is unit tests involve shutting down kafka brokers. However, when a broker is shut down in a unit test, it does not close the socket connections associated with that broker. The impact of that is if a controller broker is shut down, it will not respond to few forwarded metadata requests and it will not close the socket. That leads to some other broker indefinitely wait on receiving a response from the controller. This doesn't happen in non unit test environments, since if the broker is shutdown or fails, the processes releases all socket connections associated with it. So the other broker gets a broken pipe exception and doesn't end up waiting forever

        Show
        Neha Narkhede added a comment - This patch changes the way a Kafka server handles metadata requests. Metadata requests are currently a bottleneck in the system since the server reads several paths from zookeeper to serve one metadata request. The latency is also proportional to the number of topics in a metadata request. A faster way to serve metadata requests is through the controller. The reason is the controller makes all state change decisions for the cluster, so it has a cache with the latest leadership information. The patch uses the following algorithm to serve metadata requests - 1. If broker is controller, read leadership information from cache and send the response 2. If broker is not controller, instantiate a sync producer to forward metadata request to the controller 3. If broker is not controller and a controller is unavailable, send ControllerNotAvailable error code back to the client There are some things to note here - 1. How does a broker know who the current controller and its host/port ? If we read from zookeeper, then that makes metadata requests slow, although less slower than before. However, upon controller failover, the controller sends a LeaderAndIsr request to all* brokers. So when a broker receives any state change request from the controller, it stores the controller's host/port in cache from zookeeper. Since state change requests are rare, this is ok Now there is a corner case we need to take care of. If a new broker is brought up and it doesn't have any partitions assigned to it, it won't receive any LeaderAndIsr request in the current code base. This patch takes care of this by changing controller to always send leader and isr request to newly restarted brokers, even if there are no partitions in the request. 2. What timeout should be used when a broker wants to forward a metadata request to the controller ? Since there isn't a way to know the timeout specified by the original metadata request, we can potentially set the socket timeout for the forwarded request to be Integer.MAX. This will ensure that the forwarded request will not prematurely time out. However, we need to ensure that the controller always sends a response back to the broker OR closes the socket. This will prevent the broker's I/O thread from waiting indefinitely for a response from the controller. This also requires the controller to not block when serving metadata from its cache OR it will block some other broker's I/O thread, which is bad. 3. On the controller, should it acquire a lock when reading its cache to serve a metadata request ? This is not required and is potentially dangerous. It is not required since even if the controller's cache information is undergoing change and we send stale information to the client, it will get an error and retry. Eventually, the cache will be consistent and the accurate metadata will be sent to the client. This is also ok since changes to the controller's cache are relatively rare so chances of stale metadata are low. 4. Do we need to make the timeout for the forwarded metadata request configurable ? Ideally no, except for unit tests. The reason is unit tests involve shutting down kafka brokers. However, when a broker is shut down in a unit test, it does not close the socket connections associated with that broker. The impact of that is if a controller broker is shut down, it will not respond to few forwarded metadata requests and it will not close the socket. That leads to some other broker indefinitely wait on receiving a response from the controller. This doesn't happen in non unit test environments, since if the broker is shutdown or fails, the processes releases all socket connections associated with it. So the other broker gets a broken pipe exception and doesn't end up waiting forever
        Neha Narkhede made changes -
        Attachment metadata-request-improvement.patch [ 12582557 ]
        Hide
        Joel Koshy added a comment -

        Haven't looked at the patch yet, but went through the overview. An alternate approach that we may want to consider is to maintain a metadata cache at every broker. The cache can be kept consistent by having the controller send a (new) update-metadata request to all brokers whenever it sends out a leaderAndIsr request. A new request type would avoid needing to "overload" the leader and isr request.

        This would help avoid the herd effect of multiple clients flooding the controller with metadata requests (although these requests should return quickly with your patch).

        Show
        Joel Koshy added a comment - Haven't looked at the patch yet, but went through the overview. An alternate approach that we may want to consider is to maintain a metadata cache at every broker. The cache can be kept consistent by having the controller send a (new) update-metadata request to all brokers whenever it sends out a leaderAndIsr request. A new request type would avoid needing to "overload" the leader and isr request. This would help avoid the herd effect of multiple clients flooding the controller with metadata requests (although these requests should return quickly with your patch).
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Hide
        Neha Narkhede added a comment -

        I like Joel's suggestion for 2 reasons -

        1. Like he mentioned, if the controller pushes metadata updates to brokers, it will avoid the herd effect when multiple clients need to update metadata.
        2. It is not a good idea to overload LeaderAndIsrRequest with UpdateMetadata since those 2 requests are fired under different circumstances. Part of this complication is also due to the fact that LeaderAndIsrRequest is also overloaded by start replica state change.

        The latest patch includes Joel's suggestion. It includes the following changes -

        1. A new controller state change request and response is defined - UpdateMetadataRequest and UpdateMetadataResponse. UpdateMetadataRequest has the partition state information like leader, isr, replicas for a list of partitions. In addition to this, it also has a list of live brokers and the broker id -> host:port mapping for all brokers in the cluster. The live brokers information is used when the broker handles metadata request to figure out if the leader is alive or not. UpdateMetadataResponse is similar to LeaderAndIsrResponse, in the sense that it has an error code per partition and a top level error code just like any other state change request

        2. Every kafka broker maintains 3 data structures - leader cache, the list of alive brokers, the broker id-> host:port mapping for all brokers. These data structures are updated by the UpdateMetadataRequest and queried by the TopicMetadataRequest. So those accesses need to be synchronized

        3. The controller fires the update metadata request -
        3.1 When a new broker is started up. The newly restarted brokers are sent the partition state info for all partitions in the cluster.
        3.2 When a broker fails, since leaders for some partitions would've changed
        3.3 On a controller failover, since there could've been leader changes during the failover
        3.4 On preferred replica election, since leaders for many partitions could've changed
        3.5 On partition reassignment, since leader could've changed for the reassigned partitions
        3.6 On controlled shutdown, since leaders move for the partitions hosted on the broker being shut down

        4. Unit tests have changed to wait until the update metadata request has trickled to all servers. The best way I could think of is to make the KafkaApis object and the leaderCache accessible from KafkaServer.

        Show
        Neha Narkhede added a comment - I like Joel's suggestion for 2 reasons - 1. Like he mentioned, if the controller pushes metadata updates to brokers, it will avoid the herd effect when multiple clients need to update metadata. 2. It is not a good idea to overload LeaderAndIsrRequest with UpdateMetadata since those 2 requests are fired under different circumstances. Part of this complication is also due to the fact that LeaderAndIsrRequest is also overloaded by start replica state change. The latest patch includes Joel's suggestion. It includes the following changes - 1. A new controller state change request and response is defined - UpdateMetadataRequest and UpdateMetadataResponse. UpdateMetadataRequest has the partition state information like leader, isr, replicas for a list of partitions. In addition to this, it also has a list of live brokers and the broker id -> host:port mapping for all brokers in the cluster. The live brokers information is used when the broker handles metadata request to figure out if the leader is alive or not. UpdateMetadataResponse is similar to LeaderAndIsrResponse, in the sense that it has an error code per partition and a top level error code just like any other state change request 2. Every kafka broker maintains 3 data structures - leader cache, the list of alive brokers, the broker id-> host:port mapping for all brokers. These data structures are updated by the UpdateMetadataRequest and queried by the TopicMetadataRequest. So those accesses need to be synchronized 3. The controller fires the update metadata request - 3.1 When a new broker is started up. The newly restarted brokers are sent the partition state info for all partitions in the cluster. 3.2 When a broker fails, since leaders for some partitions would've changed 3.3 On a controller failover, since there could've been leader changes during the failover 3.4 On preferred replica election, since leaders for many partitions could've changed 3.5 On partition reassignment, since leader could've changed for the reassigned partitions 3.6 On controlled shutdown, since leaders move for the partitions hosted on the broker being shut down 4. Unit tests have changed to wait until the update metadata request has trickled to all servers. The best way I could think of is to make the KafkaApis object and the leaderCache accessible from KafkaServer.
        Neha Narkhede made changes -
        Attachment kafka-901.patch [ 12583245 ]
        Neha Narkhede made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Overall, this is a good patch. It adds the new functionality in a less intrusive way to the controller. Let's spend a bit more time on the wire protocol change since it may not be trivial to change it later. Some comments:

        1. KafkaController:
        1.1 It seems that everytime that we try to send a LeaderAndIsrRequest, we follow that with an UpdateMetadataRequest. Would it be simpler to consolidate the logic in ControllerBrokerRequestBatch.sendRequestsToBrokers() such that for every LeaderAndIsrRequest that we send, we also send an UpdateMetadataRequest (with partitions in LeaderAndIsrReqeust) to all live brokers?
        1.2 ControllerContext: Not sure why we need allBrokers. Could we just return liveBrokersUnderlying as all brokers?
        1.3 The commet under sendUpdateMetadataRequest is not accurate since it doesn't always just send to new brokers.
        1.4 removeReplicaFromIsr():
        1.4.1 The logging in the following statement says ISR, but actually prints both leaderAndISr.
        debug("Removing replica %d from ISR %s for partition %s."
        1.4.2 Two new statements are added to change partitionLeadershipInfo. Is that fixing an existing issue not related to metadata?

        2. KafkaApis: Not sure why we need to maintain both aliveBrokers and allBrokers. It seems just storing aliveBrokers (with the broker info) is enough for the purpose of answering metadata request.

        3. ControllerBrokerRequestBatch: If we just need to send live brokers in the updateMetadata request, there is no need to maintain aliveBrokers and allBrokers here.

        4. PartitionStateInfo:
        4.1 Now that we are sending allReplicas, there is no need to explicitly send replicationFactor.
        4.2 We encode ISR as strings, but all replicas as ints. We should make them consistent. It's better to encode ISR as ints too.

        5. UpdateMetadataRequest: Not sure why we need ackTimeoutMs.

        6. BrokerPartitionInfo: Are the changes in getBrokerPartitionInfo() necessary? If partitionMetadata is empty, the caller in DefaultEventHandler already throws NoBrokersForPartitionException.

        7. ConsumerFetcherManager.LeaderFinderThread: The code change here is just for logging. Would it be simpler to just log the metadata response in debug mode? If we want to see the exception type associated with the error coder, we can fix the toString() method in metadata response.

        8. AdminUtils:
        8.1 Could you explain why the test testGetTopicMetadata() is deleted?
        8.2 To ensure that the metadata is propagated to all brokers, could we add a utility function waitUntilMetadataPropagated() that takes in a list of brokers, a list of topics and a timeout? We can reuse this function in all relevant tests.

        9. AsyncProducerTest.testInvalidPartition(): Not sure about the change. If we hit any exception (including UnknowTopicOrPartitionException) during partitionAndCollate(), the event handler will retry.

        Show
        Jun Rao added a comment - Thanks for the patch. Overall, this is a good patch. It adds the new functionality in a less intrusive way to the controller. Let's spend a bit more time on the wire protocol change since it may not be trivial to change it later. Some comments: 1. KafkaController: 1.1 It seems that everytime that we try to send a LeaderAndIsrRequest, we follow that with an UpdateMetadataRequest. Would it be simpler to consolidate the logic in ControllerBrokerRequestBatch.sendRequestsToBrokers() such that for every LeaderAndIsrRequest that we send, we also send an UpdateMetadataRequest (with partitions in LeaderAndIsrReqeust) to all live brokers? 1.2 ControllerContext: Not sure why we need allBrokers. Could we just return liveBrokersUnderlying as all brokers? 1.3 The commet under sendUpdateMetadataRequest is not accurate since it doesn't always just send to new brokers. 1.4 removeReplicaFromIsr(): 1.4.1 The logging in the following statement says ISR, but actually prints both leaderAndISr. debug("Removing replica %d from ISR %s for partition %s." 1.4.2 Two new statements are added to change partitionLeadershipInfo. Is that fixing an existing issue not related to metadata? 2. KafkaApis: Not sure why we need to maintain both aliveBrokers and allBrokers. It seems just storing aliveBrokers (with the broker info) is enough for the purpose of answering metadata request. 3. ControllerBrokerRequestBatch: If we just need to send live brokers in the updateMetadata request, there is no need to maintain aliveBrokers and allBrokers here. 4. PartitionStateInfo: 4.1 Now that we are sending allReplicas, there is no need to explicitly send replicationFactor. 4.2 We encode ISR as strings, but all replicas as ints. We should make them consistent. It's better to encode ISR as ints too. 5. UpdateMetadataRequest: Not sure why we need ackTimeoutMs. 6. BrokerPartitionInfo: Are the changes in getBrokerPartitionInfo() necessary? If partitionMetadata is empty, the caller in DefaultEventHandler already throws NoBrokersForPartitionException. 7. ConsumerFetcherManager.LeaderFinderThread: The code change here is just for logging. Would it be simpler to just log the metadata response in debug mode? If we want to see the exception type associated with the error coder, we can fix the toString() method in metadata response. 8. AdminUtils: 8.1 Could you explain why the test testGetTopicMetadata() is deleted? 8.2 To ensure that the metadata is propagated to all brokers, could we add a utility function waitUntilMetadataPropagated() that takes in a list of brokers, a list of topics and a timeout? We can reuse this function in all relevant tests. 9. AsyncProducerTest.testInvalidPartition(): Not sure about the change. If we hit any exception (including UnknowTopicOrPartitionException) during partitionAndCollate(), the event handler will retry.
        Hide
        Swapnil Ghike added a comment -

        Just to be sure, the two attached patches are independent of each other, right?

        Show
        Swapnil Ghike added a comment - Just to be sure, the two attached patches are independent of each other, right?
        Hide
        Neha Narkhede added a comment -

        That's correct.

        Show
        Neha Narkhede added a comment - That's correct.
        Hide
        Neha Narkhede added a comment -

        Thanks for the great review!

        1. KafkaController:
        1.1 This is a good suggestion, however it wouldn't suffice in the following cases -

        • new broker startup - Here we have to send the metadata for all partitions to the new brokers. The leader and isr request only sends the relevant partitions
        • controller failover - Here we have to send metadata for all partitions to all brokers
        • partition reassignment - Here we have to send another metadata request just to communicate the change in isr and other replicas.

        For now, I've left the old calls to sendUpdateMetadataRequest commented out to show what has changed. I will remove those comments before check in. I still think that the send update metadata request handling can be optimized to make it reach the brokers sooner, but every optimization will come with a risk. So I suggest, we first focus on correctness and then optimize if it works on large deployments.

        1.2 ControllerContext: I thought that it is unintuitive to not send broker information and only send broker id for brokers that are offline. There was a bug filed for this where users complained it was unintuitive. However, this change will need more thought to do it correctly. So I might include another patch to fix it properly. This patch doesn't have this change

        1.3 Fixed
        1.4.1 Fixed
        1.4.2 Correct, it is to fix updating the partition leadership info while shrinking isr since the leader can also change in those cases and we use partition leadership info while sending update metadata request, so it should always be kept current

        2,3. Same concern as 1.2

        4. PartitionStateInfo:
        4.1 We still need to send the number of all replicas to be able to deserialize the replica list correctly, which is the replication factor.
        4.2 Good point, changed that

        5. Good observation. This was somehow leftover in all controller state change requests. Didn't make sense, so removed it from LeaderAndIsrRequest, StopReplicaRequest and UpdateMetadataRequest

        6. It really didn't make sense to me that the producer throw NoBrokersForPartitionException when in reality it had failed to fetch metadata. This will help us read errors better

        7. Good point, moved it to the toString() API of TopicMetadata

        8. AdminUtils:
        8.1 Because it is a duplicate of the test in TopicMetadataTest.
        8.2 Good point, added that and changed all tests to use it

        9. Ideally yes. But the old code was not retrying for any exception, including UnknownTopicOrPartitionException. I've changed DefaultEventHandler to retry no matter what exception it hits. So the test is changed to reflect that it shouldn't give up with UnknownTopicOrPartitionException but instead should retry sending the message and succeed.
        ~

        Show
        Neha Narkhede added a comment - Thanks for the great review! 1. KafkaController: 1.1 This is a good suggestion, however it wouldn't suffice in the following cases - new broker startup - Here we have to send the metadata for all partitions to the new brokers. The leader and isr request only sends the relevant partitions controller failover - Here we have to send metadata for all partitions to all brokers partition reassignment - Here we have to send another metadata request just to communicate the change in isr and other replicas. For now, I've left the old calls to sendUpdateMetadataRequest commented out to show what has changed. I will remove those comments before check in. I still think that the send update metadata request handling can be optimized to make it reach the brokers sooner, but every optimization will come with a risk. So I suggest, we first focus on correctness and then optimize if it works on large deployments. 1.2 ControllerContext: I thought that it is unintuitive to not send broker information and only send broker id for brokers that are offline. There was a bug filed for this where users complained it was unintuitive. However, this change will need more thought to do it correctly. So I might include another patch to fix it properly. This patch doesn't have this change 1.3 Fixed 1.4.1 Fixed 1.4.2 Correct, it is to fix updating the partition leadership info while shrinking isr since the leader can also change in those cases and we use partition leadership info while sending update metadata request, so it should always be kept current 2,3. Same concern as 1.2 4. PartitionStateInfo: 4.1 We still need to send the number of all replicas to be able to deserialize the replica list correctly, which is the replication factor. 4.2 Good point, changed that 5. Good observation. This was somehow leftover in all controller state change requests. Didn't make sense, so removed it from LeaderAndIsrRequest, StopReplicaRequest and UpdateMetadataRequest 6. It really didn't make sense to me that the producer throw NoBrokersForPartitionException when in reality it had failed to fetch metadata. This will help us read errors better 7. Good point, moved it to the toString() API of TopicMetadata 8. AdminUtils: 8.1 Because it is a duplicate of the test in TopicMetadataTest. 8.2 Good point, added that and changed all tests to use it 9. Ideally yes. But the old code was not retrying for any exception, including UnknownTopicOrPartitionException. I've changed DefaultEventHandler to retry no matter what exception it hits. So the test is changed to reflect that it shouldn't give up with UnknownTopicOrPartitionException but instead should retry sending the message and succeed. ~
        Neha Narkhede made changes -
        Attachment kafka-901-v2.patch [ 12583495 ]
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] In Progress [ 3 ]
        Neha Narkhede made changes -
        Status In Progress [ 3 ] Patch Available [ 10002 ]
        Hide
        Neha Narkhede added a comment - - edited

        Changes in the latest patch include -

        1. Removed the all brokers and just included alive brokers in the update metadata request. So the topic metadata will not include broker information for dead brokers.

        2. My guess about there being a bug that with the update metadata request processing was right. The bug doesn't affect the correctness of update metadata, but it just delays the communication of new leaders to all brokers. The bug was that we were removing the broker going through controlled shutdown from the alive brokers list before it is really shutdown. So from a client's perspective, it takes much longer for a new leader to be available. Fixed it to include shutting down brokers in the list of alive brokers

        3. Fixed another bug related to new topic creation. This bug caused the controller to not communicate the leaders of newly created topics to all brokers causing metadata requests to fail.

        4. Tested this fix with 100s of migration tools sending data to ~400 topics to a 7 node cluster. There are ~500 consumers consuming data from this cluster. The test continuously bounces the brokers in a rolling restart fashion. The clients notice the new leaders within few 10s of ms in most cases.

        5. Also the queue time for all requests is mostly < 10 ms since metadata requests are not a bottleneck in the system anymore. The latency of a metadata request for ~300 topics itself has dropped from 10s of seconds to 10s of ms.

        Show
        Neha Narkhede added a comment - - edited Changes in the latest patch include - 1. Removed the all brokers and just included alive brokers in the update metadata request. So the topic metadata will not include broker information for dead brokers. 2. My guess about there being a bug that with the update metadata request processing was right. The bug doesn't affect the correctness of update metadata, but it just delays the communication of new leaders to all brokers. The bug was that we were removing the broker going through controlled shutdown from the alive brokers list before it is really shutdown. So from a client's perspective, it takes much longer for a new leader to be available. Fixed it to include shutting down brokers in the list of alive brokers 3. Fixed another bug related to new topic creation. This bug caused the controller to not communicate the leaders of newly created topics to all brokers causing metadata requests to fail. 4. Tested this fix with 100s of migration tools sending data to ~400 topics to a 7 node cluster. There are ~500 consumers consuming data from this cluster. The test continuously bounces the brokers in a rolling restart fashion. The clients notice the new leaders within few 10s of ms in most cases. 5. Also the queue time for all requests is mostly < 10 ms since metadata requests are not a bottleneck in the system anymore. The latency of a metadata request for ~300 topics itself has dropped from 10s of seconds to 10s of ms.
        Neha Narkhede made changes -
        Attachment kafka-901-v4.patch [ 12583603 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. A few more comments:

        40. KafkaController.ControllerContext: This is not introduced in this patch, but serveOrShuttingDownBrokerIds should just be liveBrokerIdUnderlying.

        41.ControllerBrokerRequestBatch: Instead of maintaining aliveBrokers, could we just get it from controllerContext?

        42. KafkaApis.handleTopicMetadataRequest:
        42.1 We can rewrite the following statement
        val partitionMetadata = sortedPartitions.map { partitionReplicaMap =>
        to
        val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
        Then, we don't have to redefine topicAndPartition and partitionState.
        42.2 The val partitionReplicaAssignment seems unintuitive. Should we rename it to partitionStateInfo?
        42.3 The outermost try/catch is unnecessary since it should be handled by the caller handle().
        42.4 Not sure if we need to log the following error since it's either due to LeaderNotAvailable or ReplicaNotAvailable, both are expected.
        error("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,

        43. UpdateMetadataRequest: This class needs to define handleError(). This method is actually required to be defined in every request. So we should remove the empty body of handleError() in RequestOrResponse.

        44. UpdateMetadataResponse: Do we really need the per partition level error code? It seems that a global error code is enough.

        45. ConsumerFetcherManager: We should put the following statement under logger.isDebugEnabled().
        topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))

        46. TopicMetadata.toString(): It only prints the leader. We need to print other fields in PartitionMetadata too.

        47. BrokerPartitionInfo: If the metadata response has no error, it seems that we show throw an UnknownTopicOrPartitionException, instead of a KafkaException. Alternatively, should we not throw exception at all in this case since the caller already has to deal with the case when there is no metadata?

        48. AsyncProducerTest.testInvalidPartition(): The message in the following statement is a bit missing leading. It's probably better to say sth like "Should not thrown any exception". Actually, instead of catching just UnknownTopicOrPartitionException, we should catch and fail any exception.
        case e: UnknownTopicOrPartitionException => fail("Should fail with UnknownTopicOrPartitionException")

        Show
        Jun Rao added a comment - Thanks for patch v4. A few more comments: 40. KafkaController.ControllerContext: This is not introduced in this patch, but serveOrShuttingDownBrokerIds should just be liveBrokerIdUnderlying. 41.ControllerBrokerRequestBatch: Instead of maintaining aliveBrokers, could we just get it from controllerContext? 42. KafkaApis.handleTopicMetadataRequest: 42.1 We can rewrite the following statement val partitionMetadata = sortedPartitions.map { partitionReplicaMap => to val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => Then, we don't have to redefine topicAndPartition and partitionState. 42.2 The val partitionReplicaAssignment seems unintuitive. Should we rename it to partitionStateInfo? 42.3 The outermost try/catch is unnecessary since it should be handled by the caller handle(). 42.4 Not sure if we need to log the following error since it's either due to LeaderNotAvailable or ReplicaNotAvailable, both are expected. error("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic, 43. UpdateMetadataRequest: This class needs to define handleError(). This method is actually required to be defined in every request. So we should remove the empty body of handleError() in RequestOrResponse. 44. UpdateMetadataResponse: Do we really need the per partition level error code? It seems that a global error code is enough. 45. ConsumerFetcherManager: We should put the following statement under logger.isDebugEnabled(). topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) 46. TopicMetadata.toString(): It only prints the leader. We need to print other fields in PartitionMetadata too. 47. BrokerPartitionInfo: If the metadata response has no error, it seems that we show throw an UnknownTopicOrPartitionException, instead of a KafkaException. Alternatively, should we not throw exception at all in this case since the caller already has to deal with the case when there is no metadata? 48. AsyncProducerTest.testInvalidPartition(): The message in the following statement is a bit missing leading. It's probably better to say sth like "Should not thrown any exception". Actually, instead of catching just UnknownTopicOrPartitionException, we should catch and fail any exception. case e: UnknownTopicOrPartitionException => fail("Should fail with UnknownTopicOrPartitionException")
        Hide
        Neha Narkhede added a comment -

        I'm not sure I understood your review comment #47. Other review comments are addressed.

        Show
        Neha Narkhede added a comment - I'm not sure I understood your review comment #47. Other review comments are addressed.
        Hide
        Neha Narkhede added a comment -

        All review comments are addressed. Also made another change to update metadata request handling. Basically, if an update metadata request from a stale controller epoch arrives at the broker, it should reject that request

        Show
        Neha Narkhede added a comment - All review comments are addressed. Also made another change to update metadata request handling. Basically, if an update metadata request from a stale controller epoch arrives at the broker, it should reject that request
        Neha Narkhede made changes -
        Attachment kafka-901-v5.patch [ 12583681 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. Looks good. +1. I have some minor follow up comments that we can address in a separate patch.

        Show
        Jun Rao added a comment - Thanks for patch v5. Looks good. +1. I have some minor follow up comments that we can address in a separate patch.
        Hide
        Jun Rao added a comment -

        Some minor comments:

        50. KafkaApis.handleTopicMetadataRequest: partitionStateOpt is not necessary since inside sortedPartitions.map { }, it can't be undefined.

        51. AsyncProducerTest.testInvalidPartition(): In the case statement inside catch, let's catch all exceptions and fail if it happens.

        Show
        Jun Rao added a comment - Some minor comments: 50. KafkaApis.handleTopicMetadataRequest: partitionStateOpt is not necessary since inside sortedPartitions.map { }, it can't be undefined. 51. AsyncProducerTest.testInvalidPartition(): In the case statement inside catch, let's catch all exceptions and fail if it happens.
        Hide
        Neha Narkhede added a comment -

        Thanks for your careful reviews. Checked in patch v5 and the follow up comments.

        Show
        Neha Narkhede added a comment - Thanks for your careful reviews. Checked in patch v5 and the follow up comments.
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Hide
        Neha Narkhede added a comment -

        Attaching a follow up patch that fixes 2 minor things -

        1. Added update metadata request handling to the state change log. This makes is much easier to troubleshoot any issue with metadata cache refreshing
        2. A minor bug in the controller channel manager that fixes it to read an UpdateMetadataResponse properly.

        Show
        Neha Narkhede added a comment - Attaching a follow up patch that fixes 2 minor things - 1. Added update metadata request handling to the state change log. This makes is much easier to troubleshoot any issue with metadata cache refreshing 2. A minor bug in the controller channel manager that fixes it to read an UpdateMetadataResponse properly.
        Neha Narkhede made changes -
        Attachment kafka-901-followup.patch [ 12583861 ]
        Hide
        Jun Rao added a comment -

        Thanks for the followup patch. Some comments:

        60. KafkaApis: The following logging logs the whole request for each partition. This will probably pollute the log. Is it enough just to log the whole request once?
        if(stateChangeLogger.isTraceEnabled)
        updateMetadataRequest.partitionStateInfos.foreach(p => stateChangeLogger.trace(("Broker %d handling " +
        "UpdateMetadata request %s correlation id %d received from controller %d epoch %d for partition %s")
        .format(brokerId, p._2, updateMetadataRequest.correlationId, updateMetadataRequest.controllerId,
        updateMetadataRequest.controllerEpoch, p._1)))
        Is the following logging necessary? If we know a request, we already know what should be in the cache after processing the request.
        if(stateChangeLogger.isTraceEnabled)
        stateChangeLogger.trace(("Broker %d caching leader info %s for partition %s in response to UpdateMetadata request sent by controller %d" +
        " epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
        updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
        }

        Show
        Jun Rao added a comment - Thanks for the followup patch. Some comments: 60. KafkaApis: The following logging logs the whole request for each partition. This will probably pollute the log. Is it enough just to log the whole request once? if(stateChangeLogger.isTraceEnabled) updateMetadataRequest.partitionStateInfos.foreach(p => stateChangeLogger.trace(("Broker %d handling " + "UpdateMetadata request %s correlation id %d received from controller %d epoch %d for partition %s") .format(brokerId, p._2, updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, p._1))) Is the following logging necessary? If we know a request, we already know what should be in the cache after processing the request. if(stateChangeLogger.isTraceEnabled) stateChangeLogger.trace(("Broker %d caching leader info %s for partition %s in response to UpdateMetadata request sent by controller %d" + " epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) }
        Hide
        Neha Narkhede added a comment -

        I agree. Kept only the 2nd logging message about caching the leader info.

        Also, both log messages logged only the partition leader info, not the whole request.

        Fixed another exception in ControllerChannelManager

        Show
        Neha Narkhede added a comment - I agree. Kept only the 2nd logging message about caching the leader info. Also, both log messages logged only the partition leader info, not the whole request. Fixed another exception in ControllerChannelManager
        Neha Narkhede made changes -
        Attachment kafka-901-followup2.patch [ 12584030 ]
        Hide
        Jun Rao added a comment -

        Thanks for the second followup patch. +1.

        Show
        Jun Rao added a comment - Thanks for the second followup patch. +1.
        Hide
        Neha Narkhede added a comment -

        Thanks for the quick review, committed it

        Show
        Neha Narkhede added a comment - Thanks for the quick review, committed it

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development