Kafka
  1. Kafka
  2. KAFKA-50

kafka intra-cluster replication support

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently, Kafka doesn't have replication. Each log segment is stored in a single broker. This limits both the availability and the durability of Kafka. If a broker goes down, all log segments stored on that broker become unavailable to consumers. If a broker dies permanently (e.g., disk failure), all unconsumed data on that node is lost forever. Our goal is to replicate every log segment to multiple broker nodes to improve both the availability and the durability.

      We'd like to support the following in Kafka replication:

      1. Configurable synchronous and asynchronous replication
      2. Small unavailable window (e.g., less than 5 seconds) during broker failures
      3. Auto recovery when a failed broker rejoins
      4. Balanced load when a broker fails (i.e., the load on the failed broker is evenly spread among multiple surviving brokers)

      Here is a complete design proposal for Kafka replication - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication

        Issue Links

          Activity

          Hide
          Jun Rao added a comment - - edited

          I have updated the replication V3 design wiki: https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 by incorporating the content from V2. We plan to do the implementation based on the V3 design. If there are concerns, please add your commend to the design wiki.

          The following is a list of open jiras, roughly in order of dependency and importance.

          Phase 2 (Basic message replication, with testing and tools and minimum fancy features)
          KAFKA-335: Embedded controller (1w)

          KAFKA-336: Admin RPC between controller and broker (1.5w)

          KAFKA-337: Upgrade ZKClient to allow conditional updates through ZK (0.5w)

          KAFKA-342: Broker startup (revisit based on v3.E design, 1.5w)

          KAFKA-343: Leader election, become leader, become follower (revisit based on v3. A,C; 2.5w)
          Depends on KAFKA-301

          KAFKA-329: Create topic support (revisit based on v3 design, 1w)

          KAFKA-46: Replica fetch, leader commit (v3.G design, 2.5w)
          Depends on KAFKA-301
          Depends on KAFKA-302

          KAFKA-338: controller failover (V3. D 2w)

          KAFKA-339: Multifetch for follower (1.5w)

          KAFKA-306: Fix broker failure test on 0.8 branch (1w)

          KAFKA-330: Delete topic support (1w)
          Depends on KAFKA-301

          KAFKA-327 Monitoring and tooling support (2w)

          Phase 3 (System tests and more advanced features like preferred replica leadership transfer, online partition reassignment)
          KAFKA-174 Performance suite for Kafka (2w)
          KAFKA-42 Online partition reassignment (3w)
          KAFKA-43 Preferred replica leadership transfer (1w)

          Show
          Jun Rao added a comment - - edited I have updated the replication V3 design wiki: https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 by incorporating the content from V2. We plan to do the implementation based on the V3 design. If there are concerns, please add your commend to the design wiki. The following is a list of open jiras, roughly in order of dependency and importance. Phase 2 (Basic message replication, with testing and tools and minimum fancy features) KAFKA-335 : Embedded controller (1w) KAFKA-336 : Admin RPC between controller and broker (1.5w) KAFKA-337 : Upgrade ZKClient to allow conditional updates through ZK (0.5w) KAFKA-342 : Broker startup (revisit based on v3.E design, 1.5w) KAFKA-343 : Leader election, become leader, become follower (revisit based on v3. A,C; 2.5w) Depends on KAFKA-301 KAFKA-329 : Create topic support (revisit based on v3 design, 1w) KAFKA-46 : Replica fetch, leader commit (v3.G design, 2.5w) Depends on KAFKA-301 Depends on KAFKA-302 KAFKA-338 : controller failover (V3. D 2w) KAFKA-339 : Multifetch for follower (1.5w) KAFKA-306 : Fix broker failure test on 0.8 branch (1w) KAFKA-330 : Delete topic support (1w) Depends on KAFKA-301 KAFKA-327 Monitoring and tooling support (2w) Phase 3 (System tests and more advanced features like preferred replica leadership transfer, online partition reassignment) KAFKA-174 Performance suite for Kafka (2w) KAFKA-42 Online partition reassignment (3w) KAFKA-43 Preferred replica leadership transfer (1w)
          Hide
          Neha Narkhede added a comment -

          Moving the kafka replication design docs to a wiki. This includes both the high-level as well as the low level design details. The following changes are made on the wiki -

          1. The state machine will be maintained and changed only by the leader for a partition. The leader co-ordinates each state change by requesting the followers to act on state change requests. This ensures that we don't have a split-brain problem during state changes amongst the replicas for a partition.
          2. More details are added for the various algorithms

          Show
          Neha Narkhede added a comment - Moving the kafka replication design docs to a wiki. This includes both the high-level as well as the low level design details. The following changes are made on the wiki - 1. The state machine will be maintained and changed only by the leader for a partition. The leader co-ordinates each state change by requesting the followers to act on state change requests. This ensures that we don't have a split-brain problem during state changes amongst the replicas for a partition. 2. More details are added for the various algorithms
          Hide
          Jun Rao added a comment -

          Attach the word doc of the design.

          Show
          Jun Rao added a comment - Attach the word doc of the design.
          Hide
          Neha Narkhede added a comment -

          Cool, so I was thinking if the original design was on a wiki too, it will be much easier to point to sections of it, to make discussions easier. If you have a text copy of it, would you mind pasting it in a Kafka JIRA page ? It would be very useful.

          Show
          Neha Narkhede added a comment - Cool, so I was thinking if the original design was on a wiki too, it will be much easier to point to sections of it, to make discussions easier. If you have a text copy of it, would you mind pasting it in a Kafka JIRA page ? It would be very useful.
          Hide
          Jun Rao added a comment -

          Yes, we can start a new wiki for each of the sub-jiras, if needed.

          Show
          Jun Rao added a comment - Yes, we can start a new wiki for each of the sub-jiras, if needed.
          Hide
          Neha Narkhede added a comment -

          Can the design be moved to the Kafka wiki instead of a non-editable pdf attached here ? It will make it much easier to discuss missing details in the current design.

          Show
          Neha Narkhede added a comment - Can the design be moved to the Kafka wiki instead of a non-editable pdf attached here ? It will make it much easier to discuss missing details in the current design.
          Hide
          Jun Rao added a comment -

          Here is a breakdown of all the jiras and their dependencies:

          1. kafka-47: create/delete data structures in ZK, automatically create topic, and use them (making partitions logical, supporting only 1 replica, no failure support). (L1)
          1.1 kafka-237: create/delete ZK path for a topic in an admin tool (L0)
          1.2 kafka-238: add a getTopicMetaData method in broker and expose it to producer
          1.3 kafka-239: Wire existing producer and consumer to use the new ZK data structure
          2. kafka-202: decouple request handler and socket sever; enabling long poll in the consumer (L1)
          3. kafka-240: implement new producer and consumer request format (L1)
          4. kafka-49: add ack to ProduceRequest (L2). depending on #3
          5. kafka-48: long poll in consumer (L2). depending on #2
          6. kafka-44: commit thread, replica fetcher thread (L3). depending on #1, #4, #5
          7. kafka-45: broker starting up, leader election (L3). depending on #1
          8. kafka-46: various ZK listeners (L3). depending on #1
          9. kafka-43: move master to preferred replica when possible (L4). optimization
          10. kafka-42: rebalance partition with new brokers (L4). extra feature

          Show
          Jun Rao added a comment - Here is a breakdown of all the jiras and their dependencies: 1. kafka-47: create/delete data structures in ZK, automatically create topic, and use them (making partitions logical, supporting only 1 replica, no failure support). (L1) 1.1 kafka-237: create/delete ZK path for a topic in an admin tool (L0) 1.2 kafka-238: add a getTopicMetaData method in broker and expose it to producer 1.3 kafka-239: Wire existing producer and consumer to use the new ZK data structure 2. kafka-202: decouple request handler and socket sever; enabling long poll in the consumer (L1) 3. kafka-240: implement new producer and consumer request format (L1) 4. kafka-49: add ack to ProduceRequest (L2). depending on #3 5. kafka-48: long poll in consumer (L2). depending on #2 6. kafka-44: commit thread, replica fetcher thread (L3). depending on #1, #4, #5 7. kafka-45: broker starting up, leader election (L3). depending on #1 8. kafka-46: various ZK listeners (L3). depending on #1 9. kafka-43: move master to preferred replica when possible (L4). optimization 10. kafka-42: rebalance partition with new brokers (L4). extra feature
          Hide
          Jun Rao added a comment -

          Attach v2 of the detailed design doc. Made 2 minor changes:

          1. V1 design has 2 separate ZK paths for a broker, one registered and one alive. Simplify it to have just 1 ZK path for live broker. The implication is that if a topic is created while a broker is down, no partition will be assigned to that broker. Since topic creation is infrequent, this is likely not a big issue.

          2. Use broker id as replica id for each partition, instead of using an explicit replica id.

          Show
          Jun Rao added a comment - Attach v2 of the detailed design doc. Made 2 minor changes: 1. V1 design has 2 separate ZK paths for a broker, one registered and one alive. Simplify it to have just 1 ZK path for live broker. The implication is that if a topic is created while a broker is down, no partition will be assigned to that broker. Since topic creation is infrequent, this is likely not a big issue. 2. Use broker id as replica id for each partition, instead of using an explicit replica id.
          Hide
          Jun Rao added a comment - - edited

          The dependencies of the sub-jiras look like the following:

          48
          49
          47 <-- 46,44/45 <-- 43,42,41

          This means that initially, 47,48,49 can be worked on independently.

          Show
          Jun Rao added a comment - - edited The dependencies of the sub-jiras look like the following: 48 49 47 <-- 46,44/45 <-- 43,42,41 This means that initially, 47,48,49 can be worked on independently.
          Hide
          Sharad Agarwal added a comment -

          make sense. Thanks Jay and Jun.

          Show
          Sharad Agarwal added a comment - make sense. Thanks Jay and Jun.
          Hide
          Jay Kreps added a comment -

          Hey Sharad, your comments are all correct. I think using HDFS would certainly require the least implementation effort and contains a mature replication system tested at large scale. The downside is that HDFS is fairly complex in its own right, and has a number of drawbacks for high-availability, low-latency cases (spof is one but not the only one). Also many use cases do not need replication, but supporting hdfs and local fs efficiently probably means two pretty different implementations. We felt that this kind of multi-subscriber log is a really important abstraction in its own right for systems of all kinds and so our thought was to just kind of suck it up and do the full implementation since we thought the end result would be better.

          Show
          Jay Kreps added a comment - Hey Sharad, your comments are all correct. I think using HDFS would certainly require the least implementation effort and contains a mature replication system tested at large scale. The downside is that HDFS is fairly complex in its own right, and has a number of drawbacks for high-availability, low-latency cases (spof is one but not the only one). Also many use cases do not need replication, but supporting hdfs and local fs efficiently probably means two pretty different implementations. We felt that this kind of multi-subscriber log is a really important abstraction in its own right for systems of all kinds and so our thought was to just kind of suck it up and do the full implementation since we thought the end result would be better.
          Hide
          Sharad Agarwal added a comment -

          Thanks Jun for the comments.

          > HDFS only provides data redundancy, but not computational redundancy.

          If data resides in HDFS, theoretically it can be served by any broker. The default could be being served from the broker which has hot data. (the data being written to)

          > The namenode is a SPOF
          True, but namenode going down doesn't let you loose the data, yes the cluster is not accessible for that period. However if kafka has acks and producer side spooling (which anyway should be there IMO for data durability), no data would be lost.

          > The append/truncate support is relatively new.
          Its been used by Hbase folks for quite sometime.

          > HDFS manages its data at block level.
          Doesn't really matter as users of hdfs care least about blocks. They have a file view of things.

          That all said, I don't want to derail this work with any kind of debate here. I was just thinking to get production quality replication quickly. Look forward to having the replication in. Thanks!

          Show
          Sharad Agarwal added a comment - Thanks Jun for the comments. > HDFS only provides data redundancy, but not computational redundancy. If data resides in HDFS, theoretically it can be served by any broker. The default could be being served from the broker which has hot data. (the data being written to) > The namenode is a SPOF True, but namenode going down doesn't let you loose the data, yes the cluster is not accessible for that period. However if kafka has acks and producer side spooling (which anyway should be there IMO for data durability), no data would be lost. > The append/truncate support is relatively new. Its been used by Hbase folks for quite sometime. > HDFS manages its data at block level. Doesn't really matter as users of hdfs care least about blocks. They have a file view of things. That all said, I don't want to derail this work with any kind of debate here. I was just thinking to get production quality replication quickly. Look forward to having the replication in. Thanks!
          Hide
          Jun Rao added a comment -

          We do plan to offer both async replication and sync replication. In async mode, the latency should still remain low since the client doesn't wait for the data to hit all replicas. However, a small amount data may not be replicated to the followers during a failure and will be lost. Sync mode gives you more or less the opposite. This could be useful for people who want to use Kafka as traditional messaging systems like ActiveMQ.

          This is another difference with HDFS, which only has a sync replication mode.

          Show
          Jun Rao added a comment - We do plan to offer both async replication and sync replication. In async mode, the latency should still remain low since the client doesn't wait for the data to hit all replicas. However, a small amount data may not be replicated to the followers during a failure and will be lost. Sync mode gives you more or less the opposite. This could be useful for people who want to use Kafka as traditional messaging systems like ActiveMQ. This is another difference with HDFS, which only has a sync replication mode.
          Hide
          Chris Burroughs added a comment -

          > I would even argue that if possible Kafka might consider removing Zookeeper as a dependency - or at least make it optional.

          It's already optional (enable.zookeeper=false), but you loose a lot if you disable it. Taylor, maybe you could elaborate in the mailing list or another ticket what subset of functionality you would be willing to give up to not use ZK?

          Show
          Chris Burroughs added a comment - > I would even argue that if possible Kafka might consider removing Zookeeper as a dependency - or at least make it optional. It's already optional (enable.zookeeper=false), but you loose a lot if you disable it. Taylor, maybe you could elaborate in the mailing list or another ticket what subset of functionality you would be willing to give up to not use ZK?
          Hide
          Taylor Gautier added a comment -

          To put it another way, in terms of what I want from replication - currently if I have a failure event I will currently lose the history of all of my messages. I would like Kafka to preserve as much of those messages as possible in a failure event. It's ok if not every message that appeared to be delivered was delivered.

          This is a classic CAP tradeoff - does Kafka provide C or A? I propose it continue to focus on A.

          Show
          Taylor Gautier added a comment - To put it another way, in terms of what I want from replication - currently if I have a failure event I will currently lose the history of all of my messages. I would like Kafka to preserve as much of those messages as possible in a failure event. It's ok if not every message that appeared to be delivered was delivered. This is a classic CAP tradeoff - does Kafka provide C or A? I propose it continue to focus on A.
          Hide
          Taylor Gautier added a comment -

          I must agree completely with Jun here. The beauty of Kafka lies in it's simplicity. To add another piece to the puzzle such as HDFS would break this and diminish the value. I would even argue that if possible Kafka might consider removing Zookeeper as a dependency - or at least make it optional.

          I would also add that it's not clear that HDFS would actually exhibit the write/read performance that Kafka achieves using NIO. And because there is basically zero copy, Kafka's memory and cpu overhead is incredibly low for what it does.

          These two main factors - it's incredible performance with very low overhead using commodity components are Kafka's strengths. Adding HDFS would eliminate them.

          I would suggest that any replication strategy should focus on non-guaranteed delivery. The Kafka clients already do not provide a guaranteed delivery mechanism, and as such Kafka should only be used in a setting where it's tolerable to have some amount of message loss during a failure event. Minimizing this loss is a reasonable goal to achieve, but it should not compromise the simplicity and performance of Kafka in any way.

          Show
          Taylor Gautier added a comment - I must agree completely with Jun here. The beauty of Kafka lies in it's simplicity. To add another piece to the puzzle such as HDFS would break this and diminish the value. I would even argue that if possible Kafka might consider removing Zookeeper as a dependency - or at least make it optional. I would also add that it's not clear that HDFS would actually exhibit the write/read performance that Kafka achieves using NIO. And because there is basically zero copy, Kafka's memory and cpu overhead is incredibly low for what it does. These two main factors - it's incredible performance with very low overhead using commodity components are Kafka's strengths. Adding HDFS would eliminate them. I would suggest that any replication strategy should focus on non-guaranteed delivery. The Kafka clients already do not provide a guaranteed delivery mechanism, and as such Kafka should only be used in a setting where it's tolerable to have some amount of message loss during a failure event. Minimizing this loss is a reasonable goal to achieve, but it should not compromise the simplicity and performance of Kafka in any way.
          Hide
          Jun Rao added a comment -

          Sharad,

          Good question. HDFS is a great system and is the very first thing that we thought about when looking at Kafka replication. The pros are that (1) we can offload the replication complexity to another system and (2) HDFS can recover various data failure very effectively. Some of the cons are:

          1. HDFS only provides data redundancy, but not computational redundancy. If at any given point of time, there is only one broker that can serve the data, the availability is not going to be high. When a broker is down, we need to elect another broker to take over its data. Even though data doesn't have to be physically moved, this process may require a little a bit of recovery of each partition and may take some time to complete. In that window, some partitions become unavailable. Further, data logically moved to the new broker is initially cold.

          2. HDFS currently is not a highly available system. The namenode is a SPOF and you need something like Avatar namenode to make it HA. It's not clear when this feature is going to be generally available and used.

          3. Using HDFS brings in another complex system, it's not clear how easy it is to operate, especially for an online system.

          4. HDFS is not a true POSIX file system. The append/truncate support is relatively new. This may force us to redesign some of the things that currently require in-place update (e.g., during recovery).

          5. HDFS manages its data at block level. Kafka replication can manage data at the partition level (a partition can be 3 orders of magnitude bigger than a block). This means we can manage much less meta data and therefore, potentially have a simpler design and implementation.

          Show
          Jun Rao added a comment - Sharad, Good question. HDFS is a great system and is the very first thing that we thought about when looking at Kafka replication. The pros are that (1) we can offload the replication complexity to another system and (2) HDFS can recover various data failure very effectively. Some of the cons are: 1. HDFS only provides data redundancy, but not computational redundancy. If at any given point of time, there is only one broker that can serve the data, the availability is not going to be high. When a broker is down, we need to elect another broker to take over its data. Even though data doesn't have to be physically moved, this process may require a little a bit of recovery of each partition and may take some time to complete. In that window, some partitions become unavailable. Further, data logically moved to the new broker is initially cold. 2. HDFS currently is not a highly available system. The namenode is a SPOF and you need something like Avatar namenode to make it HA. It's not clear when this feature is going to be generally available and used. 3. Using HDFS brings in another complex system, it's not clear how easy it is to operate, especially for an online system. 4. HDFS is not a true POSIX file system. The append/truncate support is relatively new. This may force us to redesign some of the things that currently require in-place update (e.g., during recovery). 5. HDFS manages its data at block level. Kafka replication can manage data at the partition level (a partition can be 3 orders of magnitude bigger than a block). This means we can manage much less meta data and therefore, potentially have a simpler design and implementation.
          Hide
          Sharad Agarwal added a comment -

          Sorry for coming late to this.

          I read the design document and it looked like it is quite a complex feature to implement and maintain going forward. The sheer amount of complexity in managing the replicas and partition just bothers me. I am wondering why can't we use HDFS which is hardened over a number of years. Obviously I may be missing some subtle things. Would be great if folks shed light on this.

          Show
          Sharad Agarwal added a comment - Sorry for coming late to this. I read the design document and it looked like it is quite a complex feature to implement and maintain going forward. The sheer amount of complexity in managing the replicas and partition just bothers me. I am wondering why can't we use HDFS which is hardened over a number of years. Obviously I may be missing some subtle things. Would be great if folks shed light on this.

            People

            • Assignee:
              Jun Rao
              Reporter:
              Jun Rao
            • Votes:
              3 Vote for this issue
              Watchers:
              25 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 1,764h
                1,764h
                Remaining:
                Remaining Estimate - 1,764h
                1,764h
                Logged:
                Time Spent - Not Specified
                Not Specified

                  Development