Kafka
  1. Kafka
  2. KAFKA-47

Create topic support and new ZK data structures for intra-cluster replication

    Details

    • Type: Bug Bug
    • Status: Reopened
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We need the DDL syntax for creating new topics. May need to use things like javaCC. Also, we need to register new data structures in ZK accordingly.

        Issue Links

          Activity

          Anonymous created issue -
          Jun Rao made changes -
          Field Original Value New Value
          Link This issue blocks KAFKA-50 [ KAFKA-50 ]
          Henry Saputra made changes -
          Status Open [ 1 ] Closed [ 6 ]
          Henry Saputra made changes -
          Resolution Unresolved [ 9 ]
          Status Closed [ 6 ] Reopened [ 4 ]
          Jun Rao made changes -
          Summary KAFKA-23 Create topic support and new ZK data structures Create topic support and new ZK data structures for intra-cluster replication
          Issue Type Sub-task [ 7 ] Bug [ 1 ]
          Reporter Jun Rao [ junrao ]
          Alan Cabrera made changes -
          Workflow jira [ 12621000 ] no-reopen-closed, patch-avail [ 12626237 ]
          Hide
          Jay Kreps added a comment -

          I recommend we just make a command line tool rather than a formal ddl. That will be more sysadmin friendly and easier to script up.

          We should also think through how this will interact with topic auto-creation which is a feature we are currently reliant on.

          Show
          Jay Kreps added a comment - I recommend we just make a command line tool rather than a formal ddl. That will be more sysadmin friendly and easier to script up. We should also think through how this will interact with topic auto-creation which is a feature we are currently reliant on.
          Hide
          Jun Rao added a comment -

          Yes, I agree that we can start with just a command line tool.

          For auto-creation of a topic, we can optionally enable the producer to automatically create a non-existing topic (with some preconfigured # partitions, replication factors, etc), using the same underlying logic of the command line tool.

          Show
          Jun Rao added a comment - Yes, I agree that we can start with just a command line tool. For auto-creation of a topic, we can optionally enable the producer to automatically create a non-existing topic (with some preconfigured # partitions, replication factors, etc), using the same underlying logic of the command line tool.
          Hide
          Jun Rao added a comment -

          Some thoughts on the create/delete topic support.

          1. What if the create process dies in the middle of the creation?
          The create process will create 1 ZK node like the following for each partition in the topic.
          /brokers/topics/[topic]/[partition_id]/replicas -->

          {replica_id : broker_id …}

          This means that if the process fails in the middle, some of the partitions may not be created. Ideally, we should probably use the multi-row transaction support feature in ZK (ZOOKEEPER-965), which will be released in ZK 3.4. Since this should be a relatively rare event, for now, we can probably just do this as a best effort. If the create command fails in the middle, we can always delete the topic and create it gain.

          2. How to delete a topic?
          We can simply delete all entries in /brokers/topics/[topic] in ZK. On receiving the delete event, each replica will delete its corresponding local log directory.

          A broker could be down when the delete command was issued. When the broker is restarted, it should check if a topic still exists in ZK. If not, it will delete the local log directory.

          A more subtle issue can happen when a topic is deleted and recreated while a broker is down. When this broker is restarted, it should recognize that its local log directory is out of date. It should delete everything in the local log directory and create a new one. One way to do this is to store a version id (could be just a timestamp) in /brokers/topics/[topic]. The same version id is also stored in a .version file in the local log directory when it's first created. By comparing the version id in the local log directory and in ZK, a broker can detect when a local log directory is out of date during startup.

          3. Where to store the log locally?
          Currently, a log is stored in

          {log.dir}/topicname-x for partition x. Now, a partition can have multiple replicas. One possibility is {log.dir}

          /topicname/x-y for partition x and replica y.

          4. What about auto-creation?
          One possibility is to add an option in the producer so that it can automatically create a topic if the topic doesn't exist in ZK yet.

          5. I'd like to break this jira into 2 parts. The first part just adds the create/delete command and the corresponding ZK path. The second part will change the local log directory, add the auto topic creation support, and simply route the produce/consume requests to the first replica of each partition. This can be a separate jira.

          Show
          Jun Rao added a comment - Some thoughts on the create/delete topic support. 1. What if the create process dies in the middle of the creation? The create process will create 1 ZK node like the following for each partition in the topic. /brokers/topics/ [topic] / [partition_id] /replicas --> {replica_id : broker_id …} This means that if the process fails in the middle, some of the partitions may not be created. Ideally, we should probably use the multi-row transaction support feature in ZK ( ZOOKEEPER-965 ), which will be released in ZK 3.4. Since this should be a relatively rare event, for now, we can probably just do this as a best effort. If the create command fails in the middle, we can always delete the topic and create it gain. 2. How to delete a topic? We can simply delete all entries in /brokers/topics/ [topic] in ZK. On receiving the delete event, each replica will delete its corresponding local log directory. A broker could be down when the delete command was issued. When the broker is restarted, it should check if a topic still exists in ZK. If not, it will delete the local log directory. A more subtle issue can happen when a topic is deleted and recreated while a broker is down. When this broker is restarted, it should recognize that its local log directory is out of date. It should delete everything in the local log directory and create a new one. One way to do this is to store a version id (could be just a timestamp) in /brokers/topics/ [topic] . The same version id is also stored in a .version file in the local log directory when it's first created. By comparing the version id in the local log directory and in ZK, a broker can detect when a local log directory is out of date during startup. 3. Where to store the log locally? Currently, a log is stored in {log.dir}/topicname-x for partition x. Now, a partition can have multiple replicas. One possibility is {log.dir} /topicname/x-y for partition x and replica y. 4. What about auto-creation? One possibility is to add an option in the producer so that it can automatically create a topic if the topic doesn't exist in ZK yet. 5. I'd like to break this jira into 2 parts. The first part just adds the create/delete command and the corresponding ZK path. The second part will change the local log directory, add the auto topic creation support, and simply route the produce/consume requests to the first replica of each partition. This can be a separate jira.
          Hide
          Jay Kreps added a comment -

          1,2. The other approach would be to implement a zk structure representing some kind of queue of actions to be carried out for each node. I think the delete case may be a special case of needing to reliably issue a command to a broker. This could be a CREATE or DELETE command or some kind of other operational command (e.g. MIGRATE-PARTITION). I think this was why Kishore and co took the zk route for communciation for Helix, I think this is one of the problems they were trying to solve.

          The other question is how this is initiated. Do we add some administrative APIs to the brokers or is the communication through zookeeper? Either is fine, but we should be consistent about how we do administrative actions. I recommend we brainstorm a list of admin type actions we might want and try to generalize something that will work for all them.

          3. As a matter of taste, I like what you propose for the directory structure, topic/x-y, better then what we currently do. This does mean needing to rename lots of files when there is a mastership change, though.

          4. One concern is that auto-create may be really slow if we are blocking on acknowledgement from all the brokers. Not sure if this is a major problem.

          Show
          Jay Kreps added a comment - 1,2. The other approach would be to implement a zk structure representing some kind of queue of actions to be carried out for each node. I think the delete case may be a special case of needing to reliably issue a command to a broker. This could be a CREATE or DELETE command or some kind of other operational command (e.g. MIGRATE-PARTITION). I think this was why Kishore and co took the zk route for communciation for Helix, I think this is one of the problems they were trying to solve. The other question is how this is initiated. Do we add some administrative APIs to the brokers or is the communication through zookeeper? Either is fine, but we should be consistent about how we do administrative actions. I recommend we brainstorm a list of admin type actions we might want and try to generalize something that will work for all them. 3. As a matter of taste, I like what you propose for the directory structure, topic/x-y, better then what we currently do. This does mean needing to rename lots of files when there is a mastership change, though. 4. One concern is that auto-create may be really slow if we are blocking on acknowledgement from all the brokers. Not sure if this is a major problem.
          Hide
          Jun Rao added a comment -

          1,2. It seems to me it's simpler to have all brokers read from a single ZK path that stores the source of true, instead of broadcasting messages to every broker. For the latter, one has to further worry about what if only part of but not all messages are posted.

          To be consistent, I think all administrative commands simply create some data structures in ZK and should complete very quickly. Each broker watches those ZK paths and take actions accordingly.

          3. The directory name is only tied to replica id and won't change with a mastership change. The mastership info is recorded in ZK, not in directory names.

          4. Typically, auto-create should complete very quickly since it just writes a few data structures in ZK.

          Show
          Jun Rao added a comment - 1,2. It seems to me it's simpler to have all brokers read from a single ZK path that stores the source of true, instead of broadcasting messages to every broker. For the latter, one has to further worry about what if only part of but not all messages are posted. To be consistent, I think all administrative commands simply create some data structures in ZK and should complete very quickly. Each broker watches those ZK paths and take actions accordingly. 3. The directory name is only tied to replica id and won't change with a mastership change. The mastership info is recorded in ZK, not in directory names. 4. Typically, auto-create should complete very quickly since it just writes a few data structures in ZK.
          Neha Narkhede made changes -
          Link This issue depends on KAFKA-233 [ KAFKA-233 ]
          Neha Narkhede made changes -
          Link This issue depends on KAFKA-233 [ KAFKA-233 ]
          Hide
          Prashanth Menon added a comment - - edited

          Does how we store the logs locally still require changes in light of the modifications we've made to the protocol? A replica for partition X is stored at most once on a broker so with the current naming conventon we'll never run into conflicts. Perhaps to be explicit that a certain directory is a replica, we could put them into

          {log.dir}

          /topicname/replica/partitionId but I don't think it's entirely necessary?

          Show
          Prashanth Menon added a comment - - edited Does how we store the logs locally still require changes in light of the modifications we've made to the protocol? A replica for partition X is stored at most once on a broker so with the current naming conventon we'll never run into conflicts. Perhaps to be explicit that a certain directory is a replica, we could put them into {log.dir} /topicname/replica/partitionId but I don't think it's entirely necessary?
          Hide
          Jun Rao added a comment -

          Prashanth, good question.

          Yes, we could continue using the current log structure

          {log.dir}/topicname-partitionid. The only thing is that we would like to store some per topic metadata on disk, e.g., the version id (creation time) of a topic (to deal with some of the edge cases during topic re-creation). With the current structure, we either have to duplicate the topic metadata in each partition directory or deterministically pick one partition (like the smallest one) to store the metadata. Neither is ideal. It's much cleaner if we use the new structure {log.dir}

          /topicname/partitionid. Then the topic metadata can be stored under

          {log.dir}

          /topicname.

          Show
          Jun Rao added a comment - Prashanth, good question. Yes, we could continue using the current log structure {log.dir}/topicname-partitionid. The only thing is that we would like to store some per topic metadata on disk, e.g., the version id (creation time) of a topic (to deal with some of the edge cases during topic re-creation). With the current structure, we either have to duplicate the topic metadata in each partition directory or deterministically pick one partition (like the smallest one) to store the metadata. Neither is ideal. It's much cleaner if we use the new structure {log.dir} /topicname/partitionid. Then the topic metadata can be stored under {log.dir} /topicname.
          Hide
          Taylor Gautier added a comment -

          Hmm, if you guys are considering changing something about the log structure might I request that you consider doing something to ease the pain when there are 1,000's or 10,000's topics?

          The current structure doesn't work well since most filesystems tend to have problems when you store 20k or more directories in one directory.

          A hashing scheme is a good solution. The tradeoff is that it is much more difficult to find the topic directory by hand. A hash of even 10 top level directories would afford 10x more total topics (currently, the practical limit appears to hover around 20k, so 10x would give us 200k) – this would probably be sufficient for my needs.

          Show
          Taylor Gautier added a comment - Hmm, if you guys are considering changing something about the log structure might I request that you consider doing something to ease the pain when there are 1,000's or 10,000's topics? The current structure doesn't work well since most filesystems tend to have problems when you store 20k or more directories in one directory. A hashing scheme is a good solution. The tradeoff is that it is much more difficult to find the topic directory by hand. A hash of even 10 top level directories would afford 10x more total topics (currently, the practical limit appears to hover around 20k, so 10x would give us 200k) – this would probably be sufficient for my needs.
          Hide
          Prashanth Menon added a comment -

          Just my two cents here. Hashing (even consistent) is a logical idea, the caveat being that it will require maintaining an upper bound on the number of topic "buckets" to avoid renaming and moving a bunch of large files around if we don't enforce such a limit. To solve the other issue of not being able to quickly determine which bucket a topic falls into by-hand, we could maintain a file at the bucket-level that lists which topic belongs in which bucket, much like the metadata file Jun mentioned earlier. It's extra overhead on the part of the system, I'm not familiar with a single broker requiring so many topics but it's certainly conceivable.

          Show
          Prashanth Menon added a comment - Just my two cents here. Hashing (even consistent) is a logical idea, the caveat being that it will require maintaining an upper bound on the number of topic "buckets" to avoid renaming and moving a bunch of large files around if we don't enforce such a limit. To solve the other issue of not being able to quickly determine which bucket a topic falls into by-hand, we could maintain a file at the bucket-level that lists which topic belongs in which bucket, much like the metadata file Jun mentioned earlier. It's extra overhead on the part of the system, I'm not familiar with a single broker requiring so many topics but it's certainly conceivable.
          Hide
          Jun Rao added a comment -

          By using

          {log.dir}

          /topicname/partitionid, we already reduce the # of directories from # total partitions to # total topics. We can think a bit more how to improve it further. This should probably be a separate jira.

          Show
          Jun Rao added a comment - By using {log.dir} /topicname/partitionid, we already reduce the # of directories from # total partitions to # total topics. We can think a bit more how to improve it further. This should probably be a separate jira.

            People

            • Assignee:
              Unassigned
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development