Thanks for patch v2. Looks good overall. Some comments.
20. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(): The info logging should be different, depending on whether the topic is created or updated. Also, fix the indentation in the else clause.
21.1 remove unused imports
21.2 Is there any value in having the "replica" option? It seems that it should always be the existing replication factor.
21.3 For the "replica-assignment-list" option, could we make it clear in the description that this is for newly added partitions.
21.4 getManualReplicaAssignment(): We need to make sure the replica factor is the same as the existing one.
22. KafkaController.onNewTopicCreation(): Could you explain why the onNewPartition statement is moved to before the watcher registration? Normally, in order not to miss any watchers, one needs to register the watcher before reading the associated nodes in ZK.
23.1 In the following statement, we are actually returning all replica assignments.
val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
23.2 Instead of using controllerContext.partitionLeadershipInfo to filter out existing partitions, it's probably better to use controllerContext.partitionReplicaAssignment, since leaders may not always exist.
23.3 In the error logging, could we add the affected data path?
24. AddPartitionsTest: remove unused imports
25. Did we do any test to make sure that existing consumers can pick up the new partitions?
26. The patch needs to be rebased.