Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      This ticket will implement a controller as described in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3

      This includes creating the controller path, setting up necessary watchers (e.g, Broker path and TopicPath), and failover the controller.

      1. kafka_controller_v1.patch
        29 kB
        Yang Ye
      2. kafka_controller_v2.patch
        30 kB
        Yang Ye
      3. kafka_controller_v3.patch
        24 kB
        Yang Ye
      4. kafka_controller_v4.patch
        25 kB
        Yang Ye
      5. kafka_controller_v4.patch
        25 kB
        Yang Ye
      6. kafka_controller_v5.patch
        25 kB
        Yang Ye
      7. kafka_controller_v6.patch
        25 kB
        Yang Ye

        Activity

        Hide
        Yang Ye added a comment -

        also refine the buffer size of BlockingChannel

        Show
        Yang Ye added a comment - also refine the buffer size of BlockingChannel
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. KafkaController:
        1.1 Typo satup
        1.2 Sometimes, multiple new lines are added between methods. Can we make this consistent?
        1.3 It's probably better to create a separate class ControllerChannelManager that manages all queues and RequestSendThreads. ControllerChannelManager will support the following API:
        def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null)
        def addBroker(brokerId: Int, broker: Broker)
        def removeBroker(brokerId: Int)
        1.4 BrokerChangeListener.handleChildChange(): can we rename curChilds to javaCurChildren and scalaCurChildren to curChildren since the code is written in scala?
        1.5 We don't need to fill in the content of the listener in this jira. Just add a TODO comment.

        2. RequestSendThread:
        2.1 run(): We can't shutdown this thread if no request is added to the queue since queue.take() is blocking. We need to do something like ReplicaFetcherThread by interrupting the thread to make the thread shutdownable.

        3. KafkaZookeeper: It seems to me that it's better to move the controller election logic to KafkaController. On startup, KafkaController checks if it can become the controller. If yes, it registers the listeners on brokers and topics. Otherwise, it registers a listener on the controller path for controller failover. This way, all logic related to controller is self contained in KafkaController.

        4. BlockingChannel:
        4.1 useDefaultBufferSize => UseDefaultBufferSize
        4.2 remove new lines added after the constructor
        4.3 Today, we have logic in SyncProducer and SimpleConsumer to reconnect if there is any socket IO exception while sending/receiving data. We probably need something like that in RequestSendThread too. Instead of duplicating this logic. It's probably better to move this logic to BlockingChannel. I was thinking that instead of have a send and a receive API, BlockingChannel can just support 1 sendAndReceive api and implements the reconnect logic for socket exception.

        5. ZkUtils: getController and getTopicPartitionLeaderAndISR are not used anywhere.

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. KafkaController: 1.1 Typo satup 1.2 Sometimes, multiple new lines are added between methods. Can we make this consistent? 1.3 It's probably better to create a separate class ControllerChannelManager that manages all queues and RequestSendThreads. ControllerChannelManager will support the following API: def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) def addBroker(brokerId: Int, broker: Broker) def removeBroker(brokerId: Int) 1.4 BrokerChangeListener.handleChildChange(): can we rename curChilds to javaCurChildren and scalaCurChildren to curChildren since the code is written in scala? 1.5 We don't need to fill in the content of the listener in this jira. Just add a TODO comment. 2. RequestSendThread: 2.1 run(): We can't shutdown this thread if no request is added to the queue since queue.take() is blocking. We need to do something like ReplicaFetcherThread by interrupting the thread to make the thread shutdownable. 3. KafkaZookeeper: It seems to me that it's better to move the controller election logic to KafkaController. On startup, KafkaController checks if it can become the controller. If yes, it registers the listeners on brokers and topics. Otherwise, it registers a listener on the controller path for controller failover. This way, all logic related to controller is self contained in KafkaController. 4. BlockingChannel: 4.1 useDefaultBufferSize => UseDefaultBufferSize 4.2 remove new lines added after the constructor 4.3 Today, we have logic in SyncProducer and SimpleConsumer to reconnect if there is any socket IO exception while sending/receiving data. We probably need something like that in RequestSendThread too. Instead of duplicating this logic. It's probably better to move this logic to BlockingChannel. I was thinking that instead of have a send and a receive API, BlockingChannel can just support 1 sendAndReceive api and implements the reconnect logic for socket exception. 5. ZkUtils: getController and getTopicPartitionLeaderAndISR are not used anywhere.
        Hide
        Yang Ye added a comment -

        1. KafkaController:
        1.1 Typo satup
        -----> fixed

        1.2 Sometimes, multiple new lines are added between methods. Can we make this consistent?
        -----> fixed

        1.3 It's probably better to create a separate class ControllerChannelManager that manages all queues and RequestSendThreads. ControllerChannelManager will support the following API:
        def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null)
        def addBroker(brokerId: Int, broker: Broker)
        def removeBroker(brokerId: Int)

        -----> done

        1.4 BrokerChangeListener.handleChildChange(): can we rename curChilds to javaCurChildren and scalaCurChildren to curChildren since the code is written in scala?

        -----> done

        1.5 We don't need to fill in the content of the listener in this jira. Just add a TODO comment.

        -----> the brokerListener is needed

        2. RequestSendThread:
        2.1 run(): We can't shutdown this thread if no request is added to the queue since queue.take() is blocking. We need to do something like ReplicaFetcherThread by interrupting the thread to make the thread shutdownable.

        -----> done

        3. KafkaZookeeper: It seems to me that it's better to move the controller election logic to KafkaController. On startup, KafkaController checks if it can become the controller. If yes, it registers the listeners on brokers and topics. Otherwise, it registers a listener on the controller path for controller failover. This way, all logic related to controller is self contained in KafkaController.

        Seems that's not applicable, talk in person

        4. BlockingChannel:
        4.1 useDefaultBufferSize => UseDefaultBufferSize
        -----> fixed

        4.2 remove new lines added after the constructor
        -----> fixed

        4.3 Today, we have logic in SyncProducer and SimpleConsumer to reconnect if there is any socket IO exception while sending/receiving data. We probably need something like that in RequestSendThread too. Instead of duplicating this logic. It's probably better to move this logic to BlockingChannel. I was thinking that instead of have a send and a receive API, BlockingChannel can just support 1 sendAndReceive api and implements the reconnect logic for socket exception.

        -----> Let me do it separately, is it ok?

        5. ZkUtils: getController and getTopicPartitionLeaderAndISR are not used anywhere.

        -----> removed in the patch

        Show
        Yang Ye added a comment - 1. KafkaController: 1.1 Typo satup -----> fixed 1.2 Sometimes, multiple new lines are added between methods. Can we make this consistent? -----> fixed 1.3 It's probably better to create a separate class ControllerChannelManager that manages all queues and RequestSendThreads. ControllerChannelManager will support the following API: def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) def addBroker(brokerId: Int, broker: Broker) def removeBroker(brokerId: Int) -----> done 1.4 BrokerChangeListener.handleChildChange(): can we rename curChilds to javaCurChildren and scalaCurChildren to curChildren since the code is written in scala? -----> done 1.5 We don't need to fill in the content of the listener in this jira. Just add a TODO comment. -----> the brokerListener is needed 2. RequestSendThread: 2.1 run(): We can't shutdown this thread if no request is added to the queue since queue.take() is blocking. We need to do something like ReplicaFetcherThread by interrupting the thread to make the thread shutdownable. -----> done 3. KafkaZookeeper: It seems to me that it's better to move the controller election logic to KafkaController. On startup, KafkaController checks if it can become the controller. If yes, it registers the listeners on brokers and topics. Otherwise, it registers a listener on the controller path for controller failover. This way, all logic related to controller is self contained in KafkaController. Seems that's not applicable, talk in person 4. BlockingChannel: 4.1 useDefaultBufferSize => UseDefaultBufferSize -----> fixed 4.2 remove new lines added after the constructor -----> fixed 4.3 Today, we have logic in SyncProducer and SimpleConsumer to reconnect if there is any socket IO exception while sending/receiving data. We probably need something like that in RequestSendThread too. Instead of duplicating this logic. It's probably better to move this logic to BlockingChannel. I was thinking that instead of have a send and a receive API, BlockingChannel can just support 1 sendAndReceive api and implements the reconnect logic for socket exception. -----> Let me do it separately, is it ok? 5. ZkUtils: getController and getTopicPartitionLeaderAndISR are not used anywhere. -----> removed in the patch
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Some more comments:

        21. ControllerChannelManager:
        21.1 remove unused imports
        21.2 allBrokers in constructor doesn't need to be val since it's only used in initialization
        21.3 Do ControllerChannelManager, RequestSendThread need to be nested under KafkaController? They don't seems to use any instance variable of KafkaController.
        21.4 RequestSendThread: There is still a problem with shutdown. If when shutdown() is called, the thread is waiting on queue.take(), then Stream.continually will break and throw an Interrupted exception and shutdown is handled properly. However, when shutdown() is called, the thread may be performing a non-blocking operation. In this case, the thread needs to check if it's interrupted, which it's not doing. The simplest thing to do, is to change Stream to a while loop that checks if an isRunning variable is true. Shutdown() will set the isRunning flag to false and send the interruption.

        3. Could you explain a bit more why the controller election logic can't be moved to KafkaController?

        Show
        Jun Rao added a comment - Thanks for patch v2. Some more comments: 21. ControllerChannelManager: 21.1 remove unused imports 21.2 allBrokers in constructor doesn't need to be val since it's only used in initialization 21.3 Do ControllerChannelManager, RequestSendThread need to be nested under KafkaController? They don't seems to use any instance variable of KafkaController. 21.4 RequestSendThread: There is still a problem with shutdown. If when shutdown() is called, the thread is waiting on queue.take(), then Stream.continually will break and throw an Interrupted exception and shutdown is handled properly. However, when shutdown() is called, the thread may be performing a non-blocking operation. In this case, the thread needs to check if it's interrupted, which it's not doing. The simplest thing to do, is to change Stream to a while loop that checks if an isRunning variable is true. Shutdown() will set the isRunning flag to false and send the interruption. 3. Could you explain a bit more why the controller election logic can't be moved to KafkaController?
        Hide
        Jun Rao added a comment -

        For 4.3, could you create a separate jira to track this?

        Show
        Jun Rao added a comment - For 4.3, could you create a separate jira to track this?
        Hide
        Yang Ye added a comment -

        21. ControllerChannelManager:
        21.1 remove unused imports

        Done

        21.2 allBrokers in constructor doesn't need to be val since it's only used in initialization

        New it's var since it's modified during the running

        21.3 Do ControllerChannelManager, RequestSendThread need to be nested under KafkaController? They don't seems to use any instance variable of KafkaController.

        Done

        21.4 RequestSendThread: There is still a problem with shutdown. If when shutdown() is called, the thread is waiting on queue.take(), then Stream.continually will break and throw an Interrupted exception and shutdown is handled properly. However, when shutdown() is called, the thread may be performing a non-blocking operation. In this case, the thread needs to check if it's interrupted, which it's not doing. The simplest thing to do, is to change Stream to a while loop that checks if an isRunning variable is true. Shutdown() will set the isRunning flag to false and send the interruption.

        Done

        Show
        Yang Ye added a comment - 21. ControllerChannelManager: 21.1 remove unused imports Done 21.2 allBrokers in constructor doesn't need to be val since it's only used in initialization New it's var since it's modified during the running 21.3 Do ControllerChannelManager, RequestSendThread need to be nested under KafkaController? They don't seems to use any instance variable of KafkaController. Done 21.4 RequestSendThread: There is still a problem with shutdown. If when shutdown() is called, the thread is waiting on queue.take(), then Stream.continually will break and throw an Interrupted exception and shutdown is handled properly. However, when shutdown() is called, the thread may be performing a non-blocking operation. In this case, the thread needs to check if it's interrupted, which it's not doing. The simplest thing to do, is to change Stream to a while loop that checks if an isRunning variable is true. Shutdown() will set the isRunning flag to false and send the interruption. Done
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. A few more comments:

        31. KafkaController:
        31.1 In general, the watcher for a ZK path needs to be subscribed before reading the path. Otherwise, some events on the path could be missed. So in starup(): registerControllerExistListener() needs to be done before tryToBecomeController. In tryToBecomeController(), registerBrokerChangeListener() and registerTopicChangeListener() need to be called before reading brokers and topics from ZK.
        31.2 do we need 3 locks? We can probably just use 1 lock to synchronize all access to allBrokers and allTopics.
        31.3 tryToBecomeController(): need to add the initial set of brokers to ControllerChannelManager
        31.4 need a sessionExpiration listener so that it can clean state after the controller lost its registration (e.g., shut down ControllerChannelManager and call tryToBecomeController).
        31.5 In tryToBecomeController, set controllerChannelManager to null if it can't become a controller.

        32. BrokerChangeListener.handleChildChange(): should remove deleted brokers from allBrokers.

        33. ControllerExistListener.handleDataChange(): If no logic is needed here, add a comment to make it clear.

        34. ZkUitls: remove extra new line after getTopicPartitionLeaderAndISR

        35. ControllerBasicTest: remove extra new lines btw methods

        36. RequestSendThread: Should we name the thread "requestSendThread-brokerid"?

        Show
        Jun Rao added a comment - Thanks for patch v3. A few more comments: 31. KafkaController: 31.1 In general, the watcher for a ZK path needs to be subscribed before reading the path. Otherwise, some events on the path could be missed. So in starup(): registerControllerExistListener() needs to be done before tryToBecomeController. In tryToBecomeController(), registerBrokerChangeListener() and registerTopicChangeListener() need to be called before reading brokers and topics from ZK. 31.2 do we need 3 locks? We can probably just use 1 lock to synchronize all access to allBrokers and allTopics. 31.3 tryToBecomeController(): need to add the initial set of brokers to ControllerChannelManager 31.4 need a sessionExpiration listener so that it can clean state after the controller lost its registration (e.g., shut down ControllerChannelManager and call tryToBecomeController). 31.5 In tryToBecomeController, set controllerChannelManager to null if it can't become a controller. 32. BrokerChangeListener.handleChildChange(): should remove deleted brokers from allBrokers. 33. ControllerExistListener.handleDataChange(): If no logic is needed here, add a comment to make it clear. 34. ZkUitls: remove extra new line after getTopicPartitionLeaderAndISR 35. ControllerBasicTest: remove extra new lines btw methods 36. RequestSendThread: Should we name the thread "requestSendThread-brokerid"?
        Hide
        Yang Ye added a comment -

        31. KafkaController:
        31.1 In general, the watcher for a ZK path needs to be subscribed before reading the path. Otherwise, some events on the path could be missed. So in starup(): registerControllerExistListener() needs to be done before tryToBecomeController. In tryToBecomeController(), registerBrokerChangeListener() and registerTopicChangeListener() need to be called before reading brokers and topics from ZK.

        Fixed this

        31.2 do we need 3 locks? We can probably just use 1 lock to synchronize all access to allBrokers and allTopics.

        It's not necessary, so I replaced them with just one lock

        31.3 tryToBecomeController(): need to add the initial set of brokers to ControllerChannelManager

        I think we already added the initial set of brokers to the manager

        31.4 need a sessionExpiration listener so that it can clean state after the controller lost its registration (e.g., shut down ControllerChannelManager and call tryToBecomeController).

        Fixed this

        31.5 In tryToBecomeController, set controllerChannelManager to null if it can't become a controller.
        I don't think it's necessary, because the manager is always initialized as null, and only changed to not null if it becomes the controller

        32. BrokerChangeListener.handleChildChange(): should remove deleted brokers from allBrokers.
        Fixed this

        33. ControllerExistListener.handleDataChange(): If no logic is needed here, add a comment to make it clear.

        Fixed this

        34. ZkUitls: remove extra new line after getTopicPartitionLeaderAndISR

        35. ControllerBasicTest: remove extra new lines btw methods
        Fixed it

        36. RequestSendThread: Should we name the thread "requestSendThread-brokerid"?
        Fixed it

        Show
        Yang Ye added a comment - 31. KafkaController: 31.1 In general, the watcher for a ZK path needs to be subscribed before reading the path. Otherwise, some events on the path could be missed. So in starup(): registerControllerExistListener() needs to be done before tryToBecomeController. In tryToBecomeController(), registerBrokerChangeListener() and registerTopicChangeListener() need to be called before reading brokers and topics from ZK. Fixed this 31.2 do we need 3 locks? We can probably just use 1 lock to synchronize all access to allBrokers and allTopics. It's not necessary, so I replaced them with just one lock 31.3 tryToBecomeController(): need to add the initial set of brokers to ControllerChannelManager I think we already added the initial set of brokers to the manager 31.4 need a sessionExpiration listener so that it can clean state after the controller lost its registration (e.g., shut down ControllerChannelManager and call tryToBecomeController). Fixed this 31.5 In tryToBecomeController, set controllerChannelManager to null if it can't become a controller. I don't think it's necessary, because the manager is always initialized as null, and only changed to not null if it becomes the controller 32. BrokerChangeListener.handleChildChange(): should remove deleted brokers from allBrokers. Fixed this 33. ControllerExistListener.handleDataChange(): If no logic is needed here, add a comment to make it clear. Fixed this 34. ZkUitls: remove extra new line after getTopicPartitionLeaderAndISR 35. ControllerBasicTest: remove extra new lines btw methods Fixed it 36. RequestSendThread: Should we name the thread "requestSendThread-brokerid"? Fixed it
        Hide
        Yang Ye added a comment -

        setting the controllerConnectionManager to null at session expiration

        Show
        Yang Ye added a comment - setting the controllerConnectionManager to null at session expiration
        Hide
        Jun Rao added a comment -

        A few more comments for v5 patch.

        41. KafkaController: There are a couple of corner cases.
        41.1 registerSessionExpirationListener() need to be called even if the controller is not active.
        41.2 if a broker can't become the controller, it needs to call registerControllerExistListener() to register the existence watcher.

        42. ControllerChannelManager: startup() should use broker, not allBrokers. Also, allBrokers shouldn't be val.

        Show
        Jun Rao added a comment - A few more comments for v5 patch. 41. KafkaController: There are a couple of corner cases. 41.1 registerSessionExpirationListener() need to be called even if the controller is not active. 41.2 if a broker can't become the controller, it needs to call registerControllerExistListener() to register the existence watcher. 42. ControllerChannelManager: startup() should use broker, not allBrokers. Also, allBrokers shouldn't be val.
        Hide
        Yang Ye added a comment -

        41.1 registerSessionExpirationListener() need to be called even if the controller is not active.

        fixed it

        41.2 if a broker can't become the controller, it needs to call registerControllerExistListener() to register the existence watcher.

        fixed it

        42. ControllerChannelManager: startup() should use broker, not allBrokers. Also, allBrokers shouldn't be val.

        fixed it

        Show
        Yang Ye added a comment - 41.1 registerSessionExpirationListener() need to be called even if the controller is not active. fixed it 41.2 if a broker can't become the controller, it needs to call registerControllerExistListener() to register the existence watcher. fixed it 42. ControllerChannelManager: startup() should use broker, not allBrokers. Also, allBrokers shouldn't be val. fixed it
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Just committed to 0.8 branch.

        Show
        Jun Rao added a comment - Thanks for the patch. Just committed to 0.8 branch.

          People

          • Assignee:
            Yang Ye
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 168h
              168h
              Remaining:
              Remaining Estimate - 168h
              168h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development