We mistakenly request topic level information according to partitions config in the assignment json file. For example https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550:
If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 times here. But actually we only need to request just 10 (topics) times. We test a large-scale assignment, about 10K partitions. It takes tens of minutes. After optimization, it will reduce to less than 1minute.