Thanks for volunteering to fix this, EeveeB, and for identifying three potential approaches to fixing this!
Before I discuss your approaches, I do want to first confirm something. If the Connect worker creates the config topic, it does so always with a single partition. That means that we're only concerned with scenarios where the topic was manually created (or modified) before the worker was started. Is that correct?
Okay, now to your approaches. Just to clarify, the call path is basically:
- `DistributedHerder.run()` calls `startServices()`
- `DistributedHerder.startServices()` calls `configBackingStore.start()`
- `KafkaConfigBackingStore.start()` calls `configLog.start()`
- `KafkaBasedLog.start()` calls `initializer.run()`
We can see that the `KafkaBasedLog.start()` method already has code that throws a ConnectException, so we know that already stops the herder from running (which is the desired behavior). So, as long as the `KafkaConfigBackingStore.start()` method (or anything called within it, including `initializer.run()`) throws a ConnectException with the appropriate error, the herder will stop.
Option 1
This option would work, but it seems to be a fair amount of work compared the others.
Option 2
IIUC, your second option is to modify the initializer function defined in `KafkaConfigBackingStore` to also get/check the number of partitions and to throw a ConnectException if the topic already exists and has more than one partition.
This would require modifying the TopicAdmin to get the metadata for the existing topic and return it. While that's probably doable, it's more complicated than your next option.
Option 3
This is a good idea, too, especially because the `KafkaBasedLog.start()` method is already getting the partition information from the consumer in the form of a `List<PartitionInfo>` for the one topic (or a bit later, the `List<TopicPartition>` for the topic). If it stored that as a local variable and return an immutable version of that map via a method, the `KafkaConfigBackingStore.start()` method could use this method and fail if there is more than 1 partition.
The great thing about this approach is that we don't have to modify the `TopicAdmin` utility or the initializer. The changes to `KafkaBasedLog` are minimal – we just need the getter method to return an immutable list of immutable `TopicPartition` objects. (Note that we could return `PartitionInfo`, but it's not immutable and we don't know how our new getter method might be used. Returning an immutable `List<TopicPartition>` is much safer.)
We do have to modify the `KafkaConfigBackingStore.start()` method to use this new method, but that would be super simple logic.
Personally, I think this is a great approach: it's simple and localizes the changes pretty well.
Option 4
A slight variation of Option 3 is to not introduce a new field and getter in `KafkaBasedLog` that returns the partition information, but to instead pass a "partition validation" function into the `KafkaBasedLog` constructor and then to use this in the `start()` method. The benefit is that we don't have to expose any new methods on `KafkaBasedLog`, but we have to change the constructor.
This really has all the same benefits as option 3, but it's a little more hard to follow the logic. So I don't like this quite as much as option 3.
I'd be happy to do this.
Looking through the code, the worker tries to recreated the log each time a worker starts:
The above only does prep on the worker end and it isn't until start() is called that it actually tries to "create" the topic:
And the worker just fires and forgets:
This means that it doesn't seem to expose the number of partitions anywhere, except in the utility class TopicAdmin which is used by the offset backing store and the config backing store.
I see three options:
For the second and third option we'd pass the partition information to KafkaConfigBackingStore and run the check there