Details

    1. kafka-330-v1.patch
      32 kB
      Swapnil Ghike
    2. kafka-330-v2.patch
      59 kB
      Swapnil Ghike
    3. KAFKA-330.patch
      158 kB
      Neha Narkhede
    4. KAFKA-330_2014-01-28_15:19:20.patch
      158 kB
      Neha Narkhede
    5. KAFKA-330_2014-01-28_22:01:16.patch
      157 kB
      Neha Narkhede
    6. KAFKA-330_2014-01-31_14:19:14.patch
      183 kB
      Neha Narkhede
    7. KAFKA-330_2014-01-31_17:45:25.patch
      183 kB
      Neha Narkhede
    8. KAFKA-330_2014-02-01_11:30:32.patch
      189 kB
      Neha Narkhede
    9. KAFKA-330_2014-02-01_14:58:31.patch
      189 kB
      Neha Narkhede
    10. KAFKA-330_2014-02-05_09:31:30.patch
      194 kB
      Neha Narkhede
    11. KAFKA-330_2014-02-06_07:48:40.patch
      180 kB
      Neha Narkhede
    12. KAFKA-330_2014-02-06_09:42:38.patch
      180 kB
      Neha Narkhede
    13. KAFKA-330_2014-02-06_10:29:31.patch
      181 kB
      Neha Narkhede
    14. KAFKA-330_2014-02-06_11:37:48.patch
      181 kB
      Neha Narkhede
    15. KAFKA-330_2014-02-08_11:07:37.patch
      50 kB
      Neha Narkhede

      Issue Links

        Activity

        Neha Narkhede created issue -
        Neha Narkhede made changes -
        Field Original Value New Value
        Link This issue blocks KAFKA-50 [ KAFKA-50 ]
        Hide
        Jun Rao added a comment -

        During controller failover, we need to remove unneeded leaderAndISRPath that the previous controller didn't get a chance to remove.

        Show
        Jun Rao added a comment - During controller failover, we need to remove unneeded leaderAndISRPath that the previous controller didn't get a chance to remove.
        Joel Koshy made changes -
        Labels replication replication tools
        Neha Narkhede made changes -
        Labels replication tools feature
        Jun Rao made changes -
        Labels feature features
        Hide
        Jun Rao added a comment -

        The delete topic logic can follow the same logic in partition reassignment.
        1. Create a ZK path to indicate that we want to delete a topic.
        2. The controller registers a listener to the deleteTopic path and when the watcher is triggered:
        2.1 Send stopReplica requests to each relevant broker.
        2.2 Each broker then delete the local log directory.
        2.3 Once the stopReplica request completes, the controller deletes the deleteTopic path and the delete topic command completes.

        Show
        Jun Rao added a comment - The delete topic logic can follow the same logic in partition reassignment. 1. Create a ZK path to indicate that we want to delete a topic. 2. The controller registers a listener to the deleteTopic path and when the watcher is triggered: 2.1 Send stopReplica requests to each relevant broker. 2.2 Each broker then delete the local log directory. 2.3 Once the stopReplica request completes, the controller deletes the deleteTopic path and the delete topic command completes.
        Jay Kreps made changes -
        Labels features features project
        Hide
        Sam Meder added a comment -

        I've been doing a lot of manual resetting of data in Kafka and one thing I noticed is that clients don't always behave so well when I do that. So when you implement this you should probably also make sure that the current kafka clients behave well when a topic is removed, i.e. error or reset as appropriate.

        Show
        Sam Meder added a comment - I've been doing a lot of manual resetting of data in Kafka and one thing I noticed is that clients don't always behave so well when I do that. So when you implement this you should probably also make sure that the current kafka clients behave well when a topic is removed, i.e. error or reset as appropriate.
        Prashanth Menon made changes -
        Assignee Prashanth Menon [ prashanth.menon ]
        Hide
        Prashanth Menon added a comment -

        I'll take this on, hoping to get a patch in this weekend.

        Show
        Prashanth Menon added a comment - I'll take this on, hoping to get a patch in this weekend.
        Hide
        Jun Rao added a comment -

        Prashanth, any progress on this jira?

        Show
        Jun Rao added a comment - Prashanth, any progress on this jira?
        Hide
        Prashanth Menon added a comment -

        Apologies, I began work on this jira before going on break. Now that I'm back, I should be able to wrap it up.

        Show
        Prashanth Menon added a comment - Apologies, I began work on this jira before going on break. Now that I'm back, I should be able to wrap it up.
        Hide
        Neha Narkhede added a comment -

        Hey Prashanth, how's this JIRA coming along ?

        Show
        Neha Narkhede added a comment - Hey Prashanth, how's this JIRA coming along ?
        Hide
        Sam Meder added a comment -

        Any news on this?

        Show
        Sam Meder added a comment - Any news on this?
        Hide
        Neha Narkhede added a comment -

        Prashanth, mind if I take a look at this ? I have some time this week.

        Show
        Neha Narkhede added a comment - Prashanth, mind if I take a look at this ? I have some time this week.
        Neha Narkhede made changes -
        Labels features project features p2 project
        Hide
        Prashanth Menon added a comment -

        Apologies for not getting to this. Neha, go ahead and run with it.

        Show
        Prashanth Menon added a comment - Apologies for not getting to this. Neha, go ahead and run with it.
        Prashanth Menon made changes -
        Assignee Prashanth Menon [ prashanth.menon ] Neha Narkhede [ nehanarkhede ]
        Hide
        Neha Narkhede added a comment -

        Here is a broad description of how delete topic can work in Kafka -

        1. The delete topic tool writes to a /delete_topics/[topic] path
        2. The controller's delete topic listener fires and does the following -
        2.1 List the partitions for the topic to be deleted
        2.2 For each partition, do the following -
        2.2.1 Move the partition to OfflinePartition state. Take the leader offline. From this point on, all produce/consume requests for this partition will start failing
        2.2.2 For every replica for a partition, first move it to OfflineReplica state (it is removed from isr) then to NonExistentReplica (send stop-replica request with delete flag on to each replica)
        2.3 Delete the /brokers/topics/[topic] path from zookeeper
        2.4 Delete the /delete_topics/[topic] path to signify completion of the delete operation

        Show
        Neha Narkhede added a comment - Here is a broad description of how delete topic can work in Kafka - 1. The delete topic tool writes to a /delete_topics/ [topic] path 2. The controller's delete topic listener fires and does the following - 2.1 List the partitions for the topic to be deleted 2.2 For each partition, do the following - 2.2.1 Move the partition to OfflinePartition state. Take the leader offline. From this point on, all produce/consume requests for this partition will start failing 2.2.2 For every replica for a partition, first move it to OfflineReplica state (it is removed from isr) then to NonExistentReplica (send stop-replica request with delete flag on to each replica) 2.3 Delete the /brokers/topics/ [topic] path from zookeeper 2.4 Delete the /delete_topics/ [topic] path to signify completion of the delete operation
        Neha Narkhede made changes -
        Assignee Neha Narkhede [ nehanarkhede ] Swapnil Ghike [ swapnilghike ]
        Neha Narkhede made changes -
        Link This issue relates to KAFKA-784 [ KAFKA-784 ]
        Hide
        Neha Narkhede added a comment -

        We should check if KAFKA-784 is still an issue after adding delete topic support to Kafka 0.8

        Show
        Neha Narkhede added a comment - We should check if KAFKA-784 is still an issue after adding delete topic support to Kafka 0.8
        Hide
        Jason Rosenberg added a comment -

        I'd also like to see an auto-delete feature, where by a topic can be automatically be deleted, after it has been garbage collected, and has no more messages. This could be set to happen automatically, after an expiration time. This may require exposing an api on each broker so a broker can ask if any other brokers have messages pending for a topic, before deciding the topic should be removed.

        Show
        Jason Rosenberg added a comment - I'd also like to see an auto-delete feature, where by a topic can be automatically be deleted, after it has been garbage collected, and has no more messages. This could be set to happen automatically, after an expiration time. This may require exposing an api on each broker so a broker can ask if any other brokers have messages pending for a topic, before deciding the topic should be removed.
        Jun Rao made changes -
        Priority Major [ 3 ] Blocker [ 1 ]
        Neha Narkhede made changes -
        Labels features p2 project features kafka-0.8 p2 project
        Hide
        Swapnil Ghike added a comment - - edited
        Show
        Swapnil Ghike added a comment - - edited Delete topic admin path schema updated at https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
        Neha Narkhede made changes -
        Link This issue blocks KAFKA-833 [ KAFKA-833 ]
        Hide
        Neha Narkhede added a comment -

        It makes sense to include the delete topic feature in the 0.8 beta release since most people might create test topics that would require cleanup

        Show
        Neha Narkhede added a comment - It makes sense to include the delete topic feature in the 0.8 beta release since most people might create test topics that would require cleanup
        Hide
        Swapnil Ghike added a comment -

        Patch v1 attached.

        How topics are deleted:
        1. The DeleteTopicsCommand writes to /admin/delete_topics in zk and exits.
        2. The DeleteTopicsCommand complains if a topic that is being deleted is absent in zookeeper. It won't run even if at least one of the topics specified is actually present in the zookeeper.
        3. A DeleteTopicsListener is triggered in controller. It moves the replicas and partitions to Offline->NonExistent states, deletes the partitions from controller's memory, sends StopReplicaRequests with deletePartition=true.
        4. Brokers on receiving the StopReplicaRequest remove the partition from their own memory and delete the logs.
        5. If all the partitions were successfully deleted, the topic path is deleted from zookeeper.
        6. Controller always deletes the admin/delete_topics path at the end. It checks in removeFromTopicsBeingDeleted() whether each topic has been deleted from zookeeper, at which point it declares victory or logs a warning of shame.

        How to validate that the topics have been deleted:
        1. Rerun the DeleteTopicsCommand, it should complain that the topics are absent in zookeeper.

        Special comments:
        A. TopicChangeListener:
        1. I think that we should not handle deleted topics here. We should rather modify the controller's memory in NonExistentPartition state change. This is because the controller will release its lock between DeleteTopics listener and TopicChangeListener, we should want the controller's memory to be up-to-date when the lock is released with the completion of DeleteTopics listener.
        2. Probably there is no need to add the new topics' partititon-replica assignment to controllerContext.partitionReplicaAssignment, because onNewTopicCreation() will do that. I put a TODO there. Please correct if I am wrong.

        Handling failures:

        A. What happens when controller fails:
        1. Before OfflineReplica state change: New controller context will be initialized and initializeAndMaybeTriggerTopicDeletion() will delete the topics.
        2. After OfflineReplica state change and before OfflinePartition state change: Initialization of controller context will re-insert replicas into ISR, and initializeAndMaybeTriggerTopicDeletion() will delete the topics.
        3. After OfflinePartition state change and before NonExistentReplica state change: Ditto as 2.
        4. After NonExistentReplica state change and before NonExistentPartition state change: The replicas that were deleted will be restarted on individual brokers, then the topics will be deleted.
        5. After NonExistentPartition state change and before deleting topics from zk: Ditto as 3. (The NonExistentPartition state change in partition state machine currently does not delete the partitions from zk, it assumes that the controller will delete them, which is similar to what we do for some other state changes as of now).
        I think the deletion should proceed smoothly even if the controller fails over in the middle of 1,2,3,4 or 5 above.

        B. What happens if a topic is deleted when a broker that has a replica of that topic's partition is down? =>
        i. When the broker comes back up and the topic has been deleted from zk, the controller can only tell the broker which topics are currently alive. The broker should delete the dead logs when it receives the first leaderAndIsr request. This can be done just before starting the hw checkpointing thread.
        ii. This will also be useful in replica reassignment for a partition. When the replica reassignment algorithms sends a StopReplica request with delete=true, the receiving broker could be down. After the broker is back up, it will realize that it needs to delete the logs for certain partitions that are no longer assigned to it.

        Possible corner cases:
        1. What happens to hw checkpointing for deleted partitions? => checkpointHighWatermarks() reads the current allPartitions() on a broker and writes the hw. So the hw for deleted partitions will disappear.

        2. What happens to Produce/Fetch requests in purgatory? =>
        i. After the topics have been deleted, produce requests in purgatory will expire because there will no fetchers, fetch requests will expire because producer requests would fail in appendToLocalLog() and no more data will be appended.
        ii. Expiration of producer requests is harmless.
        iii. Expiration of fetch requests will try to send whatever data is remaining, but it will not be able to send any data because the replica would be dead. We could think of forcing the delayed fetch requests to expire before the replica is deleted and remove the expired requests from the delayed queue, but that would probably require synchronizing on the delayed queue. Thoughts?

        Other unrelated changes:
        A. ReplicaStateMachine
        1. Moved NonExistentReplica to the bottom of cases to maintain the same order as PartitionStateMachine.
        2. Deleted a redundant replicaState.put(replica,OnlineReplica) statement.
        3. Even if a replica is not in the ISR, it should always be moved to OfflineReplica state.

        B. Utils.scala:
        1. Bug fix in seqToJson().

        Testing done:
        1. Bring up one broker, create topics, delete topics, verify zk, verify that logs are gone.
        2. Bring up two brokers, create topics, delete topics, verify zk, verify that logs are gone from both brokers.
        3. Repeat the above 1 and 2 with more than one partition per topic.
        4. Write to admin/delete_paths, bring up the controller, watch the topic and logs get deleted.
        5. Bring up two brokers, create two topics with replication factor of two, verify that the logs get created. Now, shut down broker 1 and delete a topic. Verify that the topic disappears from zk and logs of broker 0. Bring up broker 1, verify that the topic disappears from the logs of broker 1 because controller (broker 0) will send leaderAndIsr request for the remaining topic.
        6. Validate error inputs.
        7. Validate that the tool prints error when a non-existent topic is being deleted.

        Is it ok if I write unit tests after this patch is checked in, in case there are modifications?

        Show
        Swapnil Ghike added a comment - Patch v1 attached. How topics are deleted: 1. The DeleteTopicsCommand writes to /admin/delete_topics in zk and exits. 2. The DeleteTopicsCommand complains if a topic that is being deleted is absent in zookeeper. It won't run even if at least one of the topics specified is actually present in the zookeeper. 3. A DeleteTopicsListener is triggered in controller. It moves the replicas and partitions to Offline->NonExistent states, deletes the partitions from controller's memory, sends StopReplicaRequests with deletePartition=true. 4. Brokers on receiving the StopReplicaRequest remove the partition from their own memory and delete the logs. 5. If all the partitions were successfully deleted, the topic path is deleted from zookeeper. 6. Controller always deletes the admin/delete_topics path at the end. It checks in removeFromTopicsBeingDeleted() whether each topic has been deleted from zookeeper, at which point it declares victory or logs a warning of shame. How to validate that the topics have been deleted: 1. Rerun the DeleteTopicsCommand, it should complain that the topics are absent in zookeeper. Special comments: A. TopicChangeListener: 1. I think that we should not handle deleted topics here. We should rather modify the controller's memory in NonExistentPartition state change. This is because the controller will release its lock between DeleteTopics listener and TopicChangeListener, we should want the controller's memory to be up-to-date when the lock is released with the completion of DeleteTopics listener. 2. Probably there is no need to add the new topics' partititon-replica assignment to controllerContext.partitionReplicaAssignment, because onNewTopicCreation() will do that. I put a TODO there. Please correct if I am wrong. Handling failures: A. What happens when controller fails: 1. Before OfflineReplica state change: New controller context will be initialized and initializeAndMaybeTriggerTopicDeletion() will delete the topics. 2. After OfflineReplica state change and before OfflinePartition state change: Initialization of controller context will re-insert replicas into ISR, and initializeAndMaybeTriggerTopicDeletion() will delete the topics. 3. After OfflinePartition state change and before NonExistentReplica state change: Ditto as 2. 4. After NonExistentReplica state change and before NonExistentPartition state change: The replicas that were deleted will be restarted on individual brokers, then the topics will be deleted. 5. After NonExistentPartition state change and before deleting topics from zk: Ditto as 3. (The NonExistentPartition state change in partition state machine currently does not delete the partitions from zk, it assumes that the controller will delete them, which is similar to what we do for some other state changes as of now). I think the deletion should proceed smoothly even if the controller fails over in the middle of 1,2,3,4 or 5 above. B. What happens if a topic is deleted when a broker that has a replica of that topic's partition is down? => i. When the broker comes back up and the topic has been deleted from zk, the controller can only tell the broker which topics are currently alive. The broker should delete the dead logs when it receives the first leaderAndIsr request. This can be done just before starting the hw checkpointing thread. ii. This will also be useful in replica reassignment for a partition. When the replica reassignment algorithms sends a StopReplica request with delete=true, the receiving broker could be down. After the broker is back up, it will realize that it needs to delete the logs for certain partitions that are no longer assigned to it. Possible corner cases: 1. What happens to hw checkpointing for deleted partitions? => checkpointHighWatermarks() reads the current allPartitions() on a broker and writes the hw. So the hw for deleted partitions will disappear. 2. What happens to Produce/Fetch requests in purgatory? => i. After the topics have been deleted, produce requests in purgatory will expire because there will no fetchers, fetch requests will expire because producer requests would fail in appendToLocalLog() and no more data will be appended. ii. Expiration of producer requests is harmless. iii. Expiration of fetch requests will try to send whatever data is remaining, but it will not be able to send any data because the replica would be dead. We could think of forcing the delayed fetch requests to expire before the replica is deleted and remove the expired requests from the delayed queue, but that would probably require synchronizing on the delayed queue. Thoughts? Other unrelated changes: A. ReplicaStateMachine 1. Moved NonExistentReplica to the bottom of cases to maintain the same order as PartitionStateMachine. 2. Deleted a redundant replicaState.put(replica,OnlineReplica) statement. 3. Even if a replica is not in the ISR, it should always be moved to OfflineReplica state. B. Utils.scala: 1. Bug fix in seqToJson(). Testing done: 1. Bring up one broker, create topics, delete topics, verify zk, verify that logs are gone. 2. Bring up two brokers, create topics, delete topics, verify zk, verify that logs are gone from both brokers. 3. Repeat the above 1 and 2 with more than one partition per topic. 4. Write to admin/delete_paths, bring up the controller, watch the topic and logs get deleted. 5. Bring up two brokers, create two topics with replication factor of two, verify that the logs get created. Now, shut down broker 1 and delete a topic. Verify that the topic disappears from zk and logs of broker 0. Bring up broker 1, verify that the topic disappears from the logs of broker 1 because controller (broker 0) will send leaderAndIsr request for the remaining topic. 6. Validate error inputs. 7. Validate that the tool prints error when a non-existent topic is being deleted. Is it ok if I write unit tests after this patch is checked in, in case there are modifications?
        Swapnil Ghike made changes -
        Attachment kafka-330-v1.patch [ 12576760 ]
        Swapnil Ghike made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Fix Version/s 0.8 [ 12317244 ]
        Hide
        Swapnil Ghike added a comment -

        Actually scratch 5 in "How topics are deleted". Topics are always deleted from zk.

        Show
        Swapnil Ghike added a comment - Actually scratch 5 in "How topics are deleted". Topics are always deleted from zk.
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch! Some suggestions -

        1. In controller, it is important to not let a long delete topics operation block critical state changes like elect leader. To make this possible, relinquish the lock between the deletes for individual topics
        2. If you do relinquish the lock like I suggested above, you need to now take care of avoid leader elections for partitions being deleted
        3. Since now you will handle topic deletion for individual topics, it might be worth changing the zookeeper structure for delete topics so status on individual topic deletes gets reported accordingly. One way to do this is to introduce a path to indicate that the admin tool has initiated delete operation for some topics (/admin/delete_topics_updated), and create child nodes under /admin/delete_topics, one per topic. As you complete individual topic deletion, you delete the /admin/delete_topics/<topic> path. Admin tool creates the /admin/delete_topics/<topic> path and updates /admin/delete_topics_updated. Controller only registers a data change watcher on /admin/delete_topics_updated. When this watcher fires, it reads the children of /admin/delete_topics and starts topic deletion.
        4. On startup/failover, the controller registers a data change watch on /admin/delete_topics_updated, and then reads the list of topics under /admin/delete_topics.
        5. Admin tool never errors out since it just adds to the list of deleted topics

        On the broker side, there are a few things to be done correctly -

        1. KafkaApis
        After receiving stop replica request, request handler should reject produce/fetch requests for partitions to be deleted by returning PartitionBeingDeleted error code. Once the delete is complete, the partition can be removed from this list. In that case, it will return UnknownTopicOrPartition error code

        2. ReplicaManager
        2.1 Remove unused variable leaderBrokerId from makeFollower()
        2.2 Fix the comment inside recordFollowerPosition to say "partition hasn't been created or has been deleted"
        2.3 Let the partition do the delete() operation. This will ensure that the leaderAndIsrUpdateLock is acquired for the duration of the delete. This will avoid interleaving leader/isr requests with stop replica requests and simplify the reasoning of log truncate/highwatermark update operations

        3. Partition - Introduce a new delete() API that works like this -
        1. Acquire leaderIsrUpdateLock so that create log does not interfere with delete log. Also remove/add fetcher does not interfere with delete log.
        2. Removes fetcher for the partition
        3. Invoke delete() on the log. Be careful how current read/write requests will be affected.

        4. LogManager
        1. When deleteLogs() is invoked, remove logs from allLogs. This will prevent flush being invoked on the log to be deleted.
        2. Invoke log.delete() on every individual log.
        3. log.markDeletedWhile(_ => true) will leave an extra rolled over segment in the in memory segment list

        5. Log
        1. Log delete should acquire "lock" to prevent interleaving with append/truncate/roll/flush etc
        Following steps need to be taken during log.delete()
        2. Invoke log.close()
        3. Invoke segmentList.delete(), where SegmentList.delete() only does contents.set(new Array[T](0))
        4. Invoke segment.delete()
        5. Update a flag deleted = true

        Few questions to be thought about -

        • Are any changes required to roll(). If deleted flag is true, then skip roll().
        • Are any changes required to markDeletedWhile(). Same as roll. If deleted flag is true, skip
        • Are any changes required to flush() ? This can be invoked either during roll or by append. It cannot be invoked by the flush thread since that is disabled for logs to be deleted. This needs to be handled by using lastOption.
        • See what to do with truncateTo(). This is used during make follower in Partition. This won't interfere with delete since Partition's delete acquires the leaderIsrUpdateLock. Another place that uses truncateTo() is the handleOffsetOutOfRange on the follower. This won't interleave since the replica fetcher was already removed before attempting to delete the log
        • See what to do with truncateAndStartWithNewOffset(). This won't interleave with delete log since the replica fetcher was already removed before attempting to delete the log
        • What if the broker is writing from the log when stop replica is deleting it ? Since log.delete() acquires the "lock", either append starts before or after the delete. If it starts after, then the changed mentioned in #7 and #9 should be made.
        • What if the broker is about to write to the log that is under deletion ? Same as above
        • What if the broker is reading from the log that is being deleted ? It will get a ClosedChannelException, I think. This needs to be conformed. The test can run a consumer that is consuming data from beginning of a log and you can invoke delete topic.
        • What if the broker about to read from the log that is being deleted ? It will try reading from a file channel that is closed. This will run into ClosedChannelException. Should we catch ClosedChannelException and log an appropriate error and send PartitionDeleted error code when that happens ?
        • What happens to the partition entry from the high watermark file when it is being deleted ? When partition is removed from allPartitions, the next high watermark checkpoint removes the partition's entry from the high watermark file.
        • What happens to requests in the purgatory when partition has been deleted ? When a partition has been removed from allPartitions, then the requests in the purgatory will send UnknownTopicOrPartitionCode back to the client.

        6. Log.read()
        val first = view.head.start
        This needs to change to headOption. Return empty message set when this returns None

        7. Log.flush()
        segments.view.last.flush()
        Need to change the above to segments.view.lastOption. If that returns None, then return without flushing.

        8. SegmentList.delete()
        contents.set(new Array[T](0))

        9. Log.append()
        Fix this to use lastOption - val segment = maybeRoll(segments.view.last)
        If None, then return (-2,-2) to signify that the log was deleted

        Show
        Neha Narkhede added a comment - Thanks for the patch! Some suggestions - 1. In controller, it is important to not let a long delete topics operation block critical state changes like elect leader. To make this possible, relinquish the lock between the deletes for individual topics 2. If you do relinquish the lock like I suggested above, you need to now take care of avoid leader elections for partitions being deleted 3. Since now you will handle topic deletion for individual topics, it might be worth changing the zookeeper structure for delete topics so status on individual topic deletes gets reported accordingly. One way to do this is to introduce a path to indicate that the admin tool has initiated delete operation for some topics (/admin/delete_topics_updated), and create child nodes under /admin/delete_topics, one per topic. As you complete individual topic deletion, you delete the /admin/delete_topics/<topic> path. Admin tool creates the /admin/delete_topics/<topic> path and updates /admin/delete_topics_updated. Controller only registers a data change watcher on /admin/delete_topics_updated. When this watcher fires, it reads the children of /admin/delete_topics and starts topic deletion. 4. On startup/failover, the controller registers a data change watch on /admin/delete_topics_updated, and then reads the list of topics under /admin/delete_topics. 5. Admin tool never errors out since it just adds to the list of deleted topics On the broker side, there are a few things to be done correctly - 1. KafkaApis After receiving stop replica request, request handler should reject produce/fetch requests for partitions to be deleted by returning PartitionBeingDeleted error code. Once the delete is complete, the partition can be removed from this list. In that case, it will return UnknownTopicOrPartition error code 2. ReplicaManager 2.1 Remove unused variable leaderBrokerId from makeFollower() 2.2 Fix the comment inside recordFollowerPosition to say "partition hasn't been created or has been deleted" 2.3 Let the partition do the delete() operation. This will ensure that the leaderAndIsrUpdateLock is acquired for the duration of the delete. This will avoid interleaving leader/isr requests with stop replica requests and simplify the reasoning of log truncate/highwatermark update operations 3. Partition - Introduce a new delete() API that works like this - 1. Acquire leaderIsrUpdateLock so that create log does not interfere with delete log. Also remove/add fetcher does not interfere with delete log. 2. Removes fetcher for the partition 3. Invoke delete() on the log. Be careful how current read/write requests will be affected. 4. LogManager 1. When deleteLogs() is invoked, remove logs from allLogs. This will prevent flush being invoked on the log to be deleted. 2. Invoke log.delete() on every individual log. 3. log.markDeletedWhile(_ => true) will leave an extra rolled over segment in the in memory segment list 5. Log 1. Log delete should acquire "lock" to prevent interleaving with append/truncate/roll/flush etc Following steps need to be taken during log.delete() 2. Invoke log.close() 3. Invoke segmentList.delete(), where SegmentList.delete() only does contents.set(new Array [T] (0)) 4. Invoke segment.delete() 5. Update a flag deleted = true Few questions to be thought about - Are any changes required to roll(). If deleted flag is true, then skip roll(). Are any changes required to markDeletedWhile(). Same as roll. If deleted flag is true, skip Are any changes required to flush() ? This can be invoked either during roll or by append. It cannot be invoked by the flush thread since that is disabled for logs to be deleted. This needs to be handled by using lastOption. See what to do with truncateTo(). This is used during make follower in Partition. This won't interfere with delete since Partition's delete acquires the leaderIsrUpdateLock. Another place that uses truncateTo() is the handleOffsetOutOfRange on the follower. This won't interleave since the replica fetcher was already removed before attempting to delete the log See what to do with truncateAndStartWithNewOffset(). This won't interleave with delete log since the replica fetcher was already removed before attempting to delete the log What if the broker is writing from the log when stop replica is deleting it ? Since log.delete() acquires the "lock", either append starts before or after the delete. If it starts after, then the changed mentioned in #7 and #9 should be made. What if the broker is about to write to the log that is under deletion ? Same as above What if the broker is reading from the log that is being deleted ? It will get a ClosedChannelException, I think. This needs to be conformed. The test can run a consumer that is consuming data from beginning of a log and you can invoke delete topic. What if the broker about to read from the log that is being deleted ? It will try reading from a file channel that is closed. This will run into ClosedChannelException. Should we catch ClosedChannelException and log an appropriate error and send PartitionDeleted error code when that happens ? What happens to the partition entry from the high watermark file when it is being deleted ? When partition is removed from allPartitions, the next high watermark checkpoint removes the partition's entry from the high watermark file. What happens to requests in the purgatory when partition has been deleted ? When a partition has been removed from allPartitions, then the requests in the purgatory will send UnknownTopicOrPartitionCode back to the client. 6. Log.read() val first = view.head.start This needs to change to headOption. Return empty message set when this returns None 7. Log.flush() segments.view.last.flush() Need to change the above to segments.view.lastOption. If that returns None, then return without flushing. 8. SegmentList.delete() contents.set(new Array [T] (0)) 9. Log.append() Fix this to use lastOption - val segment = maybeRoll(segments.view.last) If None, then return (-2,-2) to signify that the log was deleted
        Neha Narkhede made changes -
        Component/s controller [ 12320321 ]
        Component/s log [ 12320320 ]
        Component/s replication [ 12320326 ]
        Hide
        Swapnil Ghike added a comment -

        Replying to a few comments, will follow up with changes according to others:

        On the controller side:
        1. I think that the delete topics command will not take too long to complete, in any case it won't take any longer than Preferred Replica Election command. Both commands write to /admin zk path and trigger listeners that may send send some requests and update some zk paths. I believed that the reason for relinquishing the lock in ReassignPartitions listeners after every partition reassignment was that the controller waits for the new replicas to join the ISR, which could take long.
        2. Hence I think that we should not relinquish the lock between deletion of two topics.
        3. So maybe we don't need to use two separate zk paths? If we rerun the DeleteTopicsCommand, it should complain that the topics are absent in zookeeper if the topics were successfully deleted.

        On the broker side:
        4. LogManager:
        1. deleteLogs() indeed removes the logs from allLogs.
        2. delete() is invoked on every individual log.
        3. Yes, following up on this.

        5. Log:
        1. The lock is acquired by all these functions, but I will double check if it needs to be acquired at the top level for our purpose.
        3. Well, log.delete() takes care of deleting the individual segments.

        Will make modifications to Log*, hopefully they will address all your comments.

        Show
        Swapnil Ghike added a comment - Replying to a few comments, will follow up with changes according to others: On the controller side: 1. I think that the delete topics command will not take too long to complete, in any case it won't take any longer than Preferred Replica Election command. Both commands write to /admin zk path and trigger listeners that may send send some requests and update some zk paths. I believed that the reason for relinquishing the lock in ReassignPartitions listeners after every partition reassignment was that the controller waits for the new replicas to join the ISR, which could take long. 2. Hence I think that we should not relinquish the lock between deletion of two topics. 3. So maybe we don't need to use two separate zk paths? If we rerun the DeleteTopicsCommand, it should complain that the topics are absent in zookeeper if the topics were successfully deleted. On the broker side: 4. LogManager: 1. deleteLogs() indeed removes the logs from allLogs. 2. delete() is invoked on every individual log. 3. Yes, following up on this. 5. Log: 1. The lock is acquired by all these functions, but I will double check if it needs to be acquired at the top level for our purpose. 3. Well, log.delete() takes care of deleting the individual segments. Will make modifications to Log*, hopefully they will address all your comments.
        Hide
        Neha Narkhede added a comment - - edited

        Let's do some zookeeper math here to see how long it takes to delete one topic, 8 partitions from a 6 node kafka cluster -

        1. of zk ops operation during delete topic
          1 val partitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, topics.toSeq)
          7 val brokers = ZkUtils.getAllBrokersInCluster(zkClient)
          1 ZkUtils.getAllReplicasOnBroker(zkClient, topics.toSeq, brokers.map(_.id)) (This is a redundant read from zookeeper, so reuse the info read in step 1)
          2 removeReplicaFromIsr -> getLeaderIsrAndEpochForPartition, conditionalUpdatePersistentPath
          9 removeFromTopicsBeingDeleted -> readDataMaybeNull (1), deletePath (8)

        20 zookeeper ops. With 10ms per op, (which is a what a zookeeper cluster that kafka consumers and brokers share does in best case), that is 200ms per topic
        With 50 such topics, it is 10 seconds. That is the amount of time you are starving other partitions from being available!
        What you can do, for simplicity purposes, is keep the existing long lock on the controller side for this patch. We can improve it later or in 0.8.1

        Also, the log side of your patch does not acquire the lock. You used the delete APIs that were used by unit tests so far. So they don't deal with the issues I've mentioned above in my comments.
        Regarding LogManager - Let's look at the modified version of your patch and see if that solves the problems I've outlined above wrt to interleaving other operations with delete log.

        Show
        Neha Narkhede added a comment - - edited Let's do some zookeeper math here to see how long it takes to delete one topic, 8 partitions from a 6 node kafka cluster - of zk ops operation during delete topic 1 val partitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, topics.toSeq) 7 val brokers = ZkUtils.getAllBrokersInCluster(zkClient) 1 ZkUtils.getAllReplicasOnBroker(zkClient, topics.toSeq, brokers.map(_.id)) (This is a redundant read from zookeeper, so reuse the info read in step 1) 2 removeReplicaFromIsr -> getLeaderIsrAndEpochForPartition, conditionalUpdatePersistentPath 9 removeFromTopicsBeingDeleted -> readDataMaybeNull (1), deletePath (8) 20 zookeeper ops. With 10ms per op, (which is a what a zookeeper cluster that kafka consumers and brokers share does in best case), that is 200ms per topic With 50 such topics, it is 10 seconds. That is the amount of time you are starving other partitions from being available! What you can do, for simplicity purposes, is keep the existing long lock on the controller side for this patch. We can improve it later or in 0.8.1 Also, the log side of your patch does not acquire the lock. You used the delete APIs that were used by unit tests so far. So they don't deal with the issues I've mentioned above in my comments. Regarding LogManager - Let's look at the modified version of your patch and see if that solves the problems I've outlined above wrt to interleaving other operations with delete log.
        Hide
        Swapnil Ghike added a comment -

        Thanks for the excellent explanation. Some of these zk operations will not be repeated for every topic, for example, ZkUtils.getAllBrokersInCluster(zkClient) or removeFromTopicsBeingDeleted. But anyways, it seems that the cost of ZK operations is even worse because removeReplicaFromIsr() makes 2 Zk operations for each replica, which would be responsible for 2*50*8*3(repl-factor) = 2400 zk operations.

        I agree with you, let's optimize this after log deletion works correctly.

        Similarly, preferred replica election will suffer from a very high number of zk operations since the callbacks will elect leader for every partition. So, we could relinquish the lock in preferred replica election too.

        Show
        Swapnil Ghike added a comment - Thanks for the excellent explanation. Some of these zk operations will not be repeated for every topic, for example, ZkUtils.getAllBrokersInCluster(zkClient) or removeFromTopicsBeingDeleted. But anyways, it seems that the cost of ZK operations is even worse because removeReplicaFromIsr() makes 2 Zk operations for each replica, which would be responsible for 2*50*8*3(repl-factor) = 2400 zk operations. I agree with you, let's optimize this after log deletion works correctly. Similarly, preferred replica election will suffer from a very high number of zk operations since the callbacks will elect leader for every partition. So, we could relinquish the lock in preferred replica election too.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Even though the patch is not big, it touches quite a few critical components such as controller, replica manager, and log. It will take some time to stabilize this. We probably should consider pushing this out of 0.8 so that we don't delay the 0,8 release too much. One quick comment:

        1. KafkaControler.onTopicsDeletion(): Why do we need to read things like partitionAssignment and brokers from ZK? Could we just use the cached data in controller context?

        Show
        Jun Rao added a comment - Thanks for the patch. Even though the patch is not big, it touches quite a few critical components such as controller, replica manager, and log. It will take some time to stabilize this. We probably should consider pushing this out of 0.8 so that we don't delay the 0,8 release too much. One quick comment: 1. KafkaControler.onTopicsDeletion(): Why do we need to read things like partitionAssignment and brokers from ZK? Could we just use the cached data in controller context?
        Hide
        Swapnil Ghike added a comment -

        Yes, I agree with you Jun. Attaching a temporary patch v2 for the records, which needs testing. Patch v2 reads the cached data from the controller context. We don't need to review this patch since Log has significantly changed in trunk, so I will need to rework that part.

        Show
        Swapnil Ghike added a comment - Yes, I agree with you Jun. Attaching a temporary patch v2 for the records, which needs testing. Patch v2 reads the cached data from the controller context. We don't need to review this patch since Log has significantly changed in trunk, so I will need to rework that part.
        Swapnil Ghike made changes -
        Attachment kafka-330-v2.patch [ 12577028 ]
        Hide
        Jun Rao added a comment -

        move to 0.8.1 to reduce the remaining work in 0.8.0.

        Show
        Jun Rao added a comment - move to 0.8.1 to reduce the remaining work in 0.8.0.
        Jun Rao made changes -
        Fix Version/s 0.8 [ 12317244 ]
        Affects Version/s 0.8.1 [ 12322960 ]
        Priority Blocker [ 1 ] Major [ 3 ]
        Jun Rao made changes -
        Link This issue blocks KAFKA-833 [ KAFKA-833 ]
        Neha Narkhede made changes -
        Labels features kafka-0.8 p2 project features project
        Neha Narkhede made changes -
        Link This issue blocks KAFKA-1074 [ KAFKA-1074 ]
        Siddhesh Sundar Toraskar made changes -
        Link This issue is blocked by KAFKA-1177 [ KAFKA-1177 ]
        Siddhesh Sundar Toraskar made changes -
        Link This issue is blocked by KAFKA-1177 [ KAFKA-1177 ]
        Siddhesh Sundar Toraskar made changes -
        Link This issue relates to KAFKA-1177 [ KAFKA-1177 ]
        Neha Narkhede made changes -
        Assignee Swapnil Ghike [ swapnilghike ] Neha Narkhede [ nehanarkhede ]
        Neha Narkhede made changes -
        Attachment KAFKA-330.patch [ 12625630 ]
        Hide
        Neha Narkhede added a comment -

        Created reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Created reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Hide
        Neha Narkhede added a comment -

        Delete topic is a pretty tricky feature and there are multiple ways to solve it. I will list the various approaches with the tradeoffs here. Few things to think about that make delete topic tricky -

        1. How do you handle resuming delete topics during controller failover?
        2. How do you handle re-creating topics if brokers that host a subset of the replicas are down?
        3. If a broker fails during delete topic, how does it know which version of the topic it has logs for, when it restarts? This is relevant if we allow re-creating topics while a broker is down

        Will address these one by one.

        #1 is pretty straightforward to handle and can be achieved in a way similar to partition reassignment (through an admin path in zookeeper indicating a topic deletion that has not finished)

        #2 is an important policy decision that can affect the complexity of the design for this feature. If you allow topics to be deleted while brokers are down, the broker needs a way to know that it's version of the topic is too old. This is mainly an issue since a topic can be re-created and written to, while a broker is down. We need to ensure that a broker does not join the quorum with an older version of the log. There are 2 ways to solve this problem that I could think off -
        1. Do not allow topic deletion to succeed if a broker hosting a replica is down. Here, the controller keeps track of the state of each replica during topic deletion (TopicDeletionStarted, TopicDeletionSuccessful, TopicDeletionFailed) and only marks the topic as deleted if all replicas for all partitions of that topic are successfully deleted.
        2. Allow a topic to be deleted while a broker is down and keep track of the "generation" of the topic in a fault tolerant, highly available and consistent log. This log can either be zookeeper or a Kafka topic. The main issue here is how many generations would we have to keep track off for a topic. In other words, can this "generation" information ever be garbage collected. There isn't a good bound on this since it is unclear when the failed broker will come back online and when a topic will be re-created. That would mean keeping this generation information for potentially a very long time and incurring overhead during recovery or bootstrap of generation information during controller or broker fail overs. This is especially a problem for use cases or tests that keep creating and deleting a lot of short lived topics. Essentially, this solution is not scalable unless we figure out an intuitive way to garbage collect this topic metadata. It would require us to introduce a config for controlling when a topic's generation metadata can be garbage collected. Note that this config is different from the topic TTL feature which controls when a topic, that is currently not in use, can be deleted. Overall, this alternative is unnecessarily complex for the benefit of deleting topics while a broker is down.

        #3 is related to the policy decision made about #2. If a topic is not marked deleted successfully while a broker is down, the controller will automatically resume topic deletion when a broker restarts.

        This patch follows the previous approach of not calling a topic deletion successful until all replicas have confirmed the deletion of local state for that topic. This requires the following changes -
        1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic>

        2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics

        3. The controller has a background thread that handles topic deletion. The purpose of having this background thread is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the controller. In the future, it can be triggered based on the configured TTL for the topic. A topic's deletion will be halted in the following scenarios -

        • broker hosting one of the replicas for that topic goes down
        • partition reassignment for partitions of that topic is in progress
        • preferred replica election for partitions of that topic is in progress (though this is not strictly required since it holds the controller lock for the entire duration from start to end)

        4. Topic deletion is resumed when -

        • broker hosting one of the replicas for that topic is started
        • preferred replica election for partitions of that topic completes
        • partition reassignment for partitions of that topic completes

        5. Every replica for a topic being deleted is in either of the 3 states -

        • TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked. This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica is received from every replica)
        • TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse)
        • TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse. In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a broker fails before the request is sent and after the replica is in TopicDeletionStarted state, it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion will not be retried when the broker comes back up.)

        6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted.
        On the other hand, if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then it marks the topic for deletion retry.

        7. I've introduced callbacks for controller-broker communication. Ideally, every callback should be of the following format (RequestOrResponse) => Unit. BUT since StopReplicaResponse doesn't carry the replica id, this is handled in a somewhat hacky manner in the patch. The purpose is to fix the approach of upgrading controller-broker protocols in a reasonable way before having delete topic upgrade StopReplica request in a one-off way. Will file a JIRA for that.

        Several integration tests added for delete topic -

        1. Topic deletion when all replica brokers are alive
        2. Halt and resume topic deletion after a follower replica is restarted
        3. Halt and resume topic deletion after a controller failover
        4. Request handling during topic deletion
        5. Topic deletion and partition reassignment in parallel
        6. Topic deletion and preferred replica election in parallel
        7. Topic deletion and per topic config changes in parallel

        Show
        Neha Narkhede added a comment - Delete topic is a pretty tricky feature and there are multiple ways to solve it. I will list the various approaches with the tradeoffs here. Few things to think about that make delete topic tricky - 1. How do you handle resuming delete topics during controller failover? 2. How do you handle re-creating topics if brokers that host a subset of the replicas are down? 3. If a broker fails during delete topic, how does it know which version of the topic it has logs for, when it restarts? This is relevant if we allow re-creating topics while a broker is down Will address these one by one. #1 is pretty straightforward to handle and can be achieved in a way similar to partition reassignment (through an admin path in zookeeper indicating a topic deletion that has not finished) #2 is an important policy decision that can affect the complexity of the design for this feature. If you allow topics to be deleted while brokers are down, the broker needs a way to know that it's version of the topic is too old. This is mainly an issue since a topic can be re-created and written to, while a broker is down. We need to ensure that a broker does not join the quorum with an older version of the log. There are 2 ways to solve this problem that I could think off - 1. Do not allow topic deletion to succeed if a broker hosting a replica is down. Here, the controller keeps track of the state of each replica during topic deletion (TopicDeletionStarted, TopicDeletionSuccessful, TopicDeletionFailed) and only marks the topic as deleted if all replicas for all partitions of that topic are successfully deleted. 2. Allow a topic to be deleted while a broker is down and keep track of the "generation" of the topic in a fault tolerant, highly available and consistent log. This log can either be zookeeper or a Kafka topic. The main issue here is how many generations would we have to keep track off for a topic. In other words, can this "generation" information ever be garbage collected. There isn't a good bound on this since it is unclear when the failed broker will come back online and when a topic will be re-created. That would mean keeping this generation information for potentially a very long time and incurring overhead during recovery or bootstrap of generation information during controller or broker fail overs. This is especially a problem for use cases or tests that keep creating and deleting a lot of short lived topics. Essentially, this solution is not scalable unless we figure out an intuitive way to garbage collect this topic metadata. It would require us to introduce a config for controlling when a topic's generation metadata can be garbage collected. Note that this config is different from the topic TTL feature which controls when a topic, that is currently not in use, can be deleted. Overall, this alternative is unnecessarily complex for the benefit of deleting topics while a broker is down. #3 is related to the policy decision made about #2. If a topic is not marked deleted successfully while a broker is down, the controller will automatically resume topic deletion when a broker restarts. This patch follows the previous approach of not calling a topic deletion successful until all replicas have confirmed the deletion of local state for that topic. This requires the following changes - 1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic> 2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the controller. In the future, it can be triggered based on the configured TTL for the topic. A topic's deletion will be halted in the following scenarios - broker hosting one of the replicas for that topic goes down partition reassignment for partitions of that topic is in progress preferred replica election for partitions of that topic is in progress (though this is not strictly required since it holds the controller lock for the entire duration from start to end) 4. Topic deletion is resumed when - broker hosting one of the replicas for that topic is started preferred replica election for partitions of that topic completes partition reassignment for partitions of that topic completes 5. Every replica for a topic being deleted is in either of the 3 states - TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked. This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica is received from every replica) TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse) TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse. In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a broker fails before the request is sent and after the replica is in TopicDeletionStarted state, it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion will not be retried when the broker comes back up.) 6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted. On the other hand, if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then it marks the topic for deletion retry. 7. I've introduced callbacks for controller-broker communication. Ideally, every callback should be of the following format (RequestOrResponse) => Unit. BUT since StopReplicaResponse doesn't carry the replica id, this is handled in a somewhat hacky manner in the patch. The purpose is to fix the approach of upgrading controller-broker protocols in a reasonable way before having delete topic upgrade StopReplica request in a one-off way. Will file a JIRA for that. Several integration tests added for delete topic - 1. Topic deletion when all replica brokers are alive 2. Halt and resume topic deletion after a follower replica is restarted 3. Halt and resume topic deletion after a controller failover 4. Request handling during topic deletion 5. Topic deletion and partition reassignment in parallel 6. Topic deletion and preferred replica election in parallel 7. Topic deletion and per topic config changes in parallel
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-01-28_15:19:20.patch [ 12625765 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-01-28_22:01:16.patch [ 12625797 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-01-31_14:19:14.patch [ 12626391 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-01-31_17:45:25.patch [ 12626437 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-01_11:30:32.patch [ 12626488 ]
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-01_14:58:31.patch [ 12626499 ]
        Hide
        Neha Narkhede added a comment - - edited

        Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk.

        Asking for a more detailed review as the patch is somewhat tested and refactored to make the topic deletion logic easier to maintain and understand.

        Show
        Neha Narkhede added a comment - - edited Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk. Asking for a more detailed review as the patch is somewhat tested and refactored to make the topic deletion logic easier to maintain and understand.
        Neha Narkhede made changes -
        Fix Version/s 0.8.1 [ 12322960 ]
        Neha Narkhede made changes -
        Comment [ Updated reviewboard against branch trunk ]
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-05_09:31:30.patch [ 12627158 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-06_07:48:40.patch [ 12627358 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-06_09:42:38.patch [ 12627378 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Priority Major [ 3 ] Blocker [ 1 ]
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-06_10:29:31.patch [ 12627388 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-06_11:37:48.patch [ 12627400 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Hide
        Neha Narkhede added a comment -

        Thanks for the reviews. This is a big patch, please do submit your review even after checkin, I will fix the issues in follow up JIRAs.

        Show
        Neha Narkhede added a comment - Thanks for the reviews. This is a big patch, please do submit your review even after checkin, I will fix the issues in follow up JIRAs.
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Hide
        Sriram Subramanian added a comment -
        Show
        Sriram Subramanian added a comment - Can we have https://issues.apache.org/jira/secure/attachment/12625445/KAFKA-930_2014-01-27_13%3A28%3A51.patch this merged now that delete support is in?
        Hide
        Jun Rao added a comment -

        Sriram,

        You can check in that patch now. You probably would have to add an additional check to see whether a partition whose leader is to be moved to the preferred replica is in a topic to be deleted, while holding the controller lock. If so, skip leader balancing.

        Show
        Jun Rao added a comment - Sriram, You can check in that patch now. You probably would have to add an additional check to see whether a partition whose leader is to be moved to the preferred replica is in a topic to be deleted, while holding the controller lock. If so, skip leader balancing.
        Hide
        Neha Narkhede added a comment - - edited

        Sriram Subramanian It will not be enough to just drop the partitions that belong to topics being deleted from the preferred replica list. In addition that, I think we may also have to leave them out while computing what the preferred replica imbalance factor is.

        Show
        Neha Narkhede added a comment - - edited Sriram Subramanian It will not be enough to just drop the partitions that belong to topics being deleted from the preferred replica list. In addition that, I think we may also have to leave them out while computing what the preferred replica imbalance factor is.
        Neha Narkhede made changes -
        Attachment KAFKA-330_2014-02-08_11:07:37.patch [ 12627828 ]
        Hide
        Neha Narkhede added a comment -

        Updated reviewboard https://reviews.apache.org/r/17460/
        against branch trunk

        Show
        Neha Narkhede added a comment - Updated reviewboard https://reviews.apache.org/r/17460/ against branch trunk
        Jay Kreps made changes -
        Issue Type Bug [ 1 ] New Feature [ 2 ]

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            5 Vote for this issue
            Watchers:
            13 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development