Patch v1 attached.
How topics are deleted:
1. The DeleteTopicsCommand writes to /admin/delete_topics in zk and exits.
2. The DeleteTopicsCommand complains if a topic that is being deleted is absent in zookeeper. It won't run even if at least one of the topics specified is actually present in the zookeeper.
3. A DeleteTopicsListener is triggered in controller. It moves the replicas and partitions to Offline->NonExistent states, deletes the partitions from controller's memory, sends StopReplicaRequests with deletePartition=true.
4. Brokers on receiving the StopReplicaRequest remove the partition from their own memory and delete the logs.
5. If all the partitions were successfully deleted, the topic path is deleted from zookeeper.
6. Controller always deletes the admin/delete_topics path at the end. It checks in removeFromTopicsBeingDeleted() whether each topic has been deleted from zookeeper, at which point it declares victory or logs a warning of shame.
How to validate that the topics have been deleted:
1. Rerun the DeleteTopicsCommand, it should complain that the topics are absent in zookeeper.
1. I think that we should not handle deleted topics here. We should rather modify the controller's memory in NonExistentPartition state change. This is because the controller will release its lock between DeleteTopics listener and TopicChangeListener, we should want the controller's memory to be up-to-date when the lock is released with the completion of DeleteTopics listener.
2. Probably there is no need to add the new topics' partititon-replica assignment to controllerContext.partitionReplicaAssignment, because onNewTopicCreation() will do that. I put a TODO there. Please correct if I am wrong.
A. What happens when controller fails:
1. Before OfflineReplica state change: New controller context will be initialized and initializeAndMaybeTriggerTopicDeletion() will delete the topics.
2. After OfflineReplica state change and before OfflinePartition state change: Initialization of controller context will re-insert replicas into ISR, and initializeAndMaybeTriggerTopicDeletion() will delete the topics.
3. After OfflinePartition state change and before NonExistentReplica state change: Ditto as 2.
4. After NonExistentReplica state change and before NonExistentPartition state change: The replicas that were deleted will be restarted on individual brokers, then the topics will be deleted.
5. After NonExistentPartition state change and before deleting topics from zk: Ditto as 3. (The NonExistentPartition state change in partition state machine currently does not delete the partitions from zk, it assumes that the controller will delete them, which is similar to what we do for some other state changes as of now).
I think the deletion should proceed smoothly even if the controller fails over in the middle of 1,2,3,4 or 5 above.
B. What happens if a topic is deleted when a broker that has a replica of that topic's partition is down? =>
i. When the broker comes back up and the topic has been deleted from zk, the controller can only tell the broker which topics are currently alive. The broker should delete the dead logs when it receives the first leaderAndIsr request. This can be done just before starting the hw checkpointing thread.
ii. This will also be useful in replica reassignment for a partition. When the replica reassignment algorithms sends a StopReplica request with delete=true, the receiving broker could be down. After the broker is back up, it will realize that it needs to delete the logs for certain partitions that are no longer assigned to it.
Possible corner cases:
1. What happens to hw checkpointing for deleted partitions? => checkpointHighWatermarks() reads the current allPartitions() on a broker and writes the hw. So the hw for deleted partitions will disappear.
2. What happens to Produce/Fetch requests in purgatory? =>
i. After the topics have been deleted, produce requests in purgatory will expire because there will no fetchers, fetch requests will expire because producer requests would fail in appendToLocalLog() and no more data will be appended.
ii. Expiration of producer requests is harmless.
iii. Expiration of fetch requests will try to send whatever data is remaining, but it will not be able to send any data because the replica would be dead. We could think of forcing the delayed fetch requests to expire before the replica is deleted and remove the expired requests from the delayed queue, but that would probably require synchronizing on the delayed queue. Thoughts?
Other unrelated changes:
1. Moved NonExistentReplica to the bottom of cases to maintain the same order as PartitionStateMachine.
2. Deleted a redundant replicaState.put(replica,OnlineReplica) statement.
3. Even if a replica is not in the ISR, it should always be moved to OfflineReplica state.
1. Bug fix in seqToJson().
1. Bring up one broker, create topics, delete topics, verify zk, verify that logs are gone.
2. Bring up two brokers, create topics, delete topics, verify zk, verify that logs are gone from both brokers.
3. Repeat the above 1 and 2 with more than one partition per topic.
4. Write to admin/delete_paths, bring up the controller, watch the topic and logs get deleted.
5. Bring up two brokers, create two topics with replication factor of two, verify that the logs get created. Now, shut down broker 1 and delete a topic. Verify that the topic disappears from zk and logs of broker 0. Bring up broker 1, verify that the topic disappears from the logs of broker 1 because controller (broker 0) will send leaderAndIsr request for the remaining topic.
6. Validate error inputs.
7. Validate that the tool prints error when a non-existent topic is being deleted.
Is it ok if I write unit tests after this patch is checked in, in case there are modifications?