Kafka
  1. Kafka
  2. KAFKA-50 kafka intra-cluster replication support
  3. KAFKA-336

add an admin RPC to communicate state changes between the controller and the broker

    Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

      1. controller_broker_RPC.patch
        9 kB
        Yang Ye
      2. rpc.patch.v2
        22 kB
        Yang Ye
      3. rpc-v3.patch
        20 kB
        Yang Ye

        Activity

        Jun Rao created issue -
        Jun Rao made changes -
        Field Original Value New Value
        Fix Version/s 0.8 [ 12317244 ]
        Yang Ye made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Yang Ye made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Yang Ye made changes -
        Status Open [ 1 ] In Progress [ 3 ]
        Yang Ye made changes -
        Status In Progress [ 3 ] Open [ 1 ]
        Yang Ye made changes -
        Attachment controller_broker_RPC.patch [ 12527526 ]
        Hide
        Yang Ye added a comment -

        Controller to Broker RPC added with unit test

        Show
        Yang Ye added a comment - Controller to Broker RPC added with unit test
        Yang Ye made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. LeaderAndISRRequest
        1.1 isInit just needs 1 byte.
        1.2 remove the commented out code

        2. PartitionLeaderAndISRRequest
        2.1 zkVersion should be long.
        2.2 how about we name the class LeaderAndISR?
        2.3 remove unused imports

        3. ControllerToBorkerRequestTest: remove unused imports

        4. We will need a client wrapper like SimpleConsumer so that the controller can use to send the commands through RPC.

        5. On the server side, we need to add a new handler in KafkaApis that can handle the new requests. We also need to create new response classes for those commands. For this jira, the server can just send a dummy response.

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. LeaderAndISRRequest 1.1 isInit just needs 1 byte. 1.2 remove the commented out code 2. PartitionLeaderAndISRRequest 2.1 zkVersion should be long. 2.2 how about we name the class LeaderAndISR? 2.3 remove unused imports 3. ControllerToBorkerRequestTest: remove unused imports 4. We will need a client wrapper like SimpleConsumer so that the controller can use to send the commands through RPC. 5. On the server side, we need to add a new handler in KafkaApis that can handle the new requests. We also need to create new response classes for those commands. For this jira, the server can just send a dummy response.
        Hide
        Jay Kreps added a comment -

        Awesome. Couple of thoughts:

        1. It would be good to seperate the tests into a serialization test that just serializes the request and deserializes it and a seperate test that actually sends to the server. We have found having the simple in-process test helpful since it is easier to debug.

        2. Ideally we should also add a compatability test that saves out some binary data to ensure we don't break compatability in the future by accident. Neha has details on this. We could wait and do this later when we have everything working in case we need to make changes.

        3. Broker is spelled as Borker

        4. Can you change HashMap[Tuple2[String, Int] in the method signature to Map[(String,Int)], means the same but is a little better style.

        I recommend we not create another SimpleConsumer wrapper, but instead make a base KafkaConnection that takes any API object, and use that to implement all the clients. These internal things then won't need a special wrapper they can just directly use the KafkaConnection. This also means that in the future if the consumer needs to send a produce request (either for offset storage or audit trail) it doesn't need a separate tcp connection.

        One meta comment: these precisely versioned, compact, typed request objects are okay for public service apis because we should make them so rarely and the really important thing is compatability and efficiency. For these internal broker-to-broker apis should we instead just create a generic BrokerCommandRequest/Response which just holds a JSON string? This might be easier to evolve. I think scala has some (fairly junky) json parser included. This might be easier to evolve. Or maybe having this inconsistency is not worth it. It really depends on how many of these we think we will have.

        Show
        Jay Kreps added a comment - Awesome. Couple of thoughts: 1. It would be good to seperate the tests into a serialization test that just serializes the request and deserializes it and a seperate test that actually sends to the server. We have found having the simple in-process test helpful since it is easier to debug. 2. Ideally we should also add a compatability test that saves out some binary data to ensure we don't break compatability in the future by accident. Neha has details on this. We could wait and do this later when we have everything working in case we need to make changes. 3. Broker is spelled as Borker 4. Can you change HashMap[Tuple2 [String, Int] in the method signature to Map [(String,Int)] , means the same but is a little better style. I recommend we not create another SimpleConsumer wrapper, but instead make a base KafkaConnection that takes any API object, and use that to implement all the clients. These internal things then won't need a special wrapper they can just directly use the KafkaConnection. This also means that in the future if the consumer needs to send a produce request (either for offset storage or audit trail) it doesn't need a separate tcp connection. One meta comment: these precisely versioned, compact, typed request objects are okay for public service apis because we should make them so rarely and the really important thing is compatability and efficiency. For these internal broker-to-broker apis should we instead just create a generic BrokerCommandRequest/Response which just holds a JSON string? This might be easier to evolve. I think scala has some (fairly junky) json parser included. This might be easier to evolve. Or maybe having this inconsistency is not worth it. It really depends on how many of these we think we will have.
        Hide
        Jay Kreps added a comment -

        Never mind the KafkaConnection comment we already have a BlockingChannel which is essentially the same thing.

        Show
        Jay Kreps added a comment - Never mind the KafkaConnection comment we already have a BlockingChannel which is essentially the same thing.
        Yang Ye made changes -
        Attachment rpc.patch.v2 [ 12528555 ]
        Hide
        Jun Rao added a comment -

        Thanks for the new patch. Some comments:

        From previous review:
        #2.2. from my comment and #4 from Jay's comment are still not addressed.

        New review comments:

        11. It seems that we haven't really documented the exact format of the new requests and the responses. I documented them in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 . Does it make sense? If so, this would require some changes in this patch.

        12. KafkaZookeeper, KafkaApis: For this patch, we can just add a TODO comment in the handler in KafkaApis) and simply send a dummy response (with eror_code = 0). We can remove the code in KafkaZookeeper since the handler probably shouldn't in KafkaZookeeper when it's actually put in.

        13. Are the changes in Partition and Replica still necessary if #12 is resolved?

        For Jay's comment, for now, it seems that we just need to add a couple of new types of request. So, we can probably start with customized request objects.

        Show
        Jun Rao added a comment - Thanks for the new patch. Some comments: From previous review: #2.2. from my comment and #4 from Jay's comment are still not addressed. New review comments: 11. It seems that we haven't really documented the exact format of the new requests and the responses. I documented them in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 . Does it make sense? If so, this would require some changes in this patch. 12. KafkaZookeeper, KafkaApis: For this patch, we can just add a TODO comment in the handler in KafkaApis) and simply send a dummy response (with eror_code = 0). We can remove the code in KafkaZookeeper since the handler probably shouldn't in KafkaZookeeper when it's actually put in. 13. Are the changes in Partition and Replica still necessary if #12 is resolved? For Jay's comment, for now, it seems that we just need to add a couple of new types of request. So, we can probably start with customized request objects.
        Hide
        Jun Rao added a comment -

        Updated the new request/response format proposal in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3, adding version_id, timeout, etc.

        Show
        Jun Rao added a comment - Updated the new request/response format proposal in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , adding version_id, timeout, etc.
        Hide
        Jun Rao added a comment -

        Another comment:

        14. To facilitate a controller to send commands to individual brokers, it would useful to have some kind of ControllerCommandManager that has the following API:
        ControllerCommandManager

        { sendCommand(commandRequest: Request, brokerId: Int) }

        Under the cover, the ControllerCommandManager can potentially maintain a queue per broker and have a separate send thread that gets command from the queue, sends the request using a BlockingChannel to the right broker and deserializes and checks the response.

        Show
        Jun Rao added a comment - Another comment: 14. To facilitate a controller to send commands to individual brokers, it would useful to have some kind of ControllerCommandManager that has the following API: ControllerCommandManager { sendCommand(commandRequest: Request, brokerId: Int) } Under the cover, the ControllerCommandManager can potentially maintain a queue per broker and have a separate send thread that gets command from the queue, sends the request using a BlockingChannel to the right broker and deserializes and checks the response.
        Yang Ye made changes -
        Attachment rpc-v3.patch [ 12528839 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch V3. Some more comments.

        21. LeaderAndISRRequest:
        21.1 There is no need to put requestTypeId in the constructor. We know the constant of the request Id.
        21.2 There is no need to put requestId in write(). This will be done in BoundedByteBufferSend. See how a FetchRequest is serialized and sent to socket.
        21.3 The above comments apply to StopReplicaRequest too.
        21.4 In constructor, leaderAndISRInfos should be just a Map, not a mutable.HashMap.
        21.5 To be consistent, let's use leaderEpoc, instead of leaderGenId (see the updated protocol in the wiki).

        22. LeaderAndISRResponse:
        22.1 The patch doesn't follow the protocol design in the wiki. Is there a reason?

        23. Currently, for each type of Response, we have a customized ResponseSend object. Those ResponseSend objects all share the same pattern: writing a header and then the content. It would be good if we can consolidate those ResponseSend objects. The only exception is the response for Fetch request, which uses the sendfile API and has to be customized.

        24. KafkaApi:
        The response shouldn't return dummyTopic. Instead, it should just return NoError for each (topic,partition) in the request.

        25. Unit test:
        Let's add a unit test that tests the end-to-end RPC protocol for the new requests, similar to BackwardsCompatibilityTest.testProtocolVersion0().

        In the new patch, it would be great if you can respond to each of the review comments (whether you agree and have fixed it, or you have a different idea, etc).

        Show
        Jun Rao added a comment - Thanks for patch V3. Some more comments. 21. LeaderAndISRRequest: 21.1 There is no need to put requestTypeId in the constructor. We know the constant of the request Id. 21.2 There is no need to put requestId in write(). This will be done in BoundedByteBufferSend. See how a FetchRequest is serialized and sent to socket. 21.3 The above comments apply to StopReplicaRequest too. 21.4 In constructor, leaderAndISRInfos should be just a Map, not a mutable.HashMap. 21.5 To be consistent, let's use leaderEpoc, instead of leaderGenId (see the updated protocol in the wiki). 22. LeaderAndISRResponse: 22.1 The patch doesn't follow the protocol design in the wiki. Is there a reason? 23. Currently, for each type of Response, we have a customized ResponseSend object. Those ResponseSend objects all share the same pattern: writing a header and then the content. It would be good if we can consolidate those ResponseSend objects. The only exception is the response for Fetch request, which uses the sendfile API and has to be customized. 24. KafkaApi: The response shouldn't return dummyTopic. Instead, it should just return NoError for each (topic,partition) in the request. 25. Unit test: Let's add a unit test that tests the end-to-end RPC protocol for the new requests, similar to BackwardsCompatibilityTest.testProtocolVersion0(). In the new patch, it would be great if you can respond to each of the review comments (whether you agree and have fixed it, or you have a different idea, etc).
        Hide
        Jun Rao added a comment -

        Fixed as part of kafka-349

        Show
        Jun Rao added a comment - Fixed as part of kafka-349
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Yang Ye
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 252h
              252h
              Remaining:
              Remaining Estimate - 252h
              252h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development