Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9216

Enforce connect internal topic configuration at startup

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.11.0.0
    • 2.3.2, 2.6.0, 2.4.2, 2.5.1
    • connect
    • None

    Description

      Users sometimes configure Connect's internal topic for configurations with more than one partition. One partition is expected, however, and using more than one leads to weird behavior that is sometimes not easy to spot.

      Here's one example of a log message:

      "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, groupId=td-connect-server] Current config state offset 284 does not match group assignment 274. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
      

      Would it be possible to add a check in the KafkaConfigBackingStore and prevent the worker from starting if connect config partition count !=1 ?

      Attachments

        Issue Links

          Activity

            EeveeB Evelyn Bayes added a comment -

            I'd be happy to do this.

             

            Looking through the code, the worker tries to recreated the log each time a worker starts:

             

            public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
              this.lock = new Object();
              this.started = false;
              this.converter = converter;
              this.offset = -1;
              this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
              if (this.topic == null || this.topic.trim().length() == 0)
                throw new ConfigException("Must specify topic for connector configuration.");
              configLog = setupAndCreateKafkaBasedLog(this.topic, config);  <- here
              this.configTransformer = configTransformer;
            }
            

            The above only does prep on the worker end and it isn't until start() is called that it actually tries to "create" the topic:

             

            public KafkaBasedLog(String topic,
                                     Map<String, Object> producerConfigs,
                                     Map<String, Object> consumerConfigs,
                                     Callback<ConsumerRecord<K, V>> consumedCallback,
                                     Time time,
                                     Runnable initializer) {
                    this.topic = topic;
                    this.producerConfigs = producerConfigs;
                    this.consumerConfigs = consumerConfigs;
                    this.consumedCallback = consumedCallback;
                    this.stopRequested = false;
                    this.readLogEndOffsetCallbacks = new ArrayDeque<>();
                    this.time = time;
                    this.initializer = initializer != null ? initializer : new Runnable() {
                        @Override
                        public void run() {
                        }
                    };
                }    
            
            public void start() {                                                <- here
                    log.info("Starting KafkaBasedLog with topic " + topic);        
            
                    initializer.run();                                           <- here
                    producer = createProducer();
                    consumer = createConsumer();        
            
            

            And the worker just fires and forgets:

                private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                                          Map<String, Object> consumerProps,
                                                                          Callback<ConsumerRecord<String, byte[]>> consumedCallback,
                                                                          final NewTopic topicDescription, final Map<String, Object> adminProps) {
                    Runnable createTopics = new Runnable() {
                        @Override
                        public void run() {
                            log.debug("Creating admin client to manage Connect internal config topic");
                            try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                                admin.createTopics(topicDescription);
                            }
                        }
                    };
                    return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
                }
            

            This means that it doesn't seem to expose the number of partitions anywhere, except in the utility class TopicAdmin which is used by the offset backing store and the config backing store.

             

            I see three options:

            • Create a new temporary clients and get the metadata to do a check as part of the starting process in KafkaConfigBackingStore;
            • Modify TopicAdmin which is a utility client used to create the topic for KafkaConfigBackingStore and KafkaOffsetBackingStore but this doesn't seem to get that metadata at the moment. However, we could get it to return details on the topic; or
            • Modify KafkaBasedLog with a new method to expose the partition count and create a private variable to retain that information.

            For the second and third option we'd pass the partition information to KafkaConfigBackingStore and run the check there

             

             

             

             

            EeveeB Evelyn Bayes added a comment - I'd be happy to do this.   Looking through the code, the worker tries to recreated the log each time a worker starts:   public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) { this .lock = new Object (); this .started = false ; this .converter = converter; this .offset = -1; this .topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); if ( this .topic == null || this .topic.trim().length() == 0) throw new ConfigException( "Must specify topic for connector configuration." ); configLog = setupAndCreateKafkaBasedLog( this .topic, config); <- here this .configTransformer = configTransformer; } The above only does prep on the worker end and it isn't until start() is called that it actually tries to "create" the topic:   public KafkaBasedLog( String topic, Map< String , Object > producerConfigs, Map< String , Object > consumerConfigs, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, Runnable initializer) { this .topic = topic; this .producerConfigs = producerConfigs; this .consumerConfigs = consumerConfigs; this .consumedCallback = consumedCallback; this .stopRequested = false ; this .readLogEndOffsetCallbacks = new ArrayDeque<>(); this .time = time; this .initializer = initializer != null ? initializer : new Runnable () { @Override public void run() { } }; } public void start() { <- here log.info( "Starting KafkaBasedLog with topic " + topic); initializer.run(); <- here producer = createProducer(); consumer = createConsumer(); And the worker just fires and forgets: private KafkaBasedLog< String , byte []> createKafkaBasedLog( String topic, Map< String , Object > producerProps, Map< String , Object > consumerProps, Callback<ConsumerRecord< String , byte []>> consumedCallback, final NewTopic topicDescription, final Map< String , Object > adminProps) { Runnable createTopics = new Runnable () { @Override public void run() { log.debug( "Creating admin client to manage Connect internal config topic" ); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } } }; return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); } This means that it doesn't seem to expose the number of partitions anywhere, except in the utility class TopicAdmin which is used by the offset backing store and the config backing store.   I see three options: Create a new temporary clients and get the metadata to do a check as part of the starting process in KafkaConfigBackingStore; Modify TopicAdmin which is a utility client used to create the topic for KafkaConfigBackingStore and KafkaOffsetBackingStore but this doesn't seem to get that metadata at the moment. However, we could get it to return details on the topic; or Modify KafkaBasedLog with a new method to expose the partition count and create a private variable to retain that information. For the second and third option we'd pass the partition information to KafkaConfigBackingStore and run the check there        
            rhauch Randall Hauch added a comment -

            Thanks for volunteering to fix this, EeveeB, and for identifying three potential approaches to fixing this!

            Before I discuss your approaches, I do want to first confirm something. If the Connect worker creates the config topic, it does so always with a single partition. That means that we're only concerned with scenarios where the topic was manually created (or modified) before the worker was started. Is that correct?

            Okay, now to your approaches. Just to clarify, the call path is basically:

            1. `DistributedHerder.run()` calls `startServices()`
            2. `DistributedHerder.startServices()` calls `configBackingStore.start()`
            3. `KafkaConfigBackingStore.start()` calls `configLog.start()`
            4. `KafkaBasedLog.start()` calls `initializer.run()`

            We can see that the `KafkaBasedLog.start()` method already has code that throws a ConnectException, so we know that already stops the herder from running (which is the desired behavior). So, as long as the `KafkaConfigBackingStore.start()` method (or anything called within it, including `initializer.run()`) throws a ConnectException with the appropriate error, the herder will stop.

            Option 1

            This option would work, but it seems to be a fair amount of work compared the others. 

            Option 2

            IIUC, your second option is to modify the initializer function defined in `KafkaConfigBackingStore` to also get/check the number of partitions and to throw a ConnectException if the topic already exists and has more than one partition.

            This would require modifying the TopicAdmin to get the metadata for the existing topic and return it. While that's probably doable, it's more complicated than your next option.

            Option 3

            This is a good idea, too, especially because the `KafkaBasedLog.start()` method is already getting the partition information from the consumer in the form of a `List<PartitionInfo>` for the one topic (or a bit later, the `List<TopicPartition>` for the topic). If it stored that as a local variable and return an immutable version of that map via a method, the `KafkaConfigBackingStore.start()` method could use this method and fail if there is more than 1 partition.

            The great thing about this approach is that we don't have to modify the `TopicAdmin` utility or the initializer. The changes to `KafkaBasedLog` are minimal – we just need the getter method to return an immutable list of immutable `TopicPartition` objects. (Note that we could return `PartitionInfo`, but it's not immutable and we don't know how our new getter method might be used. Returning an immutable `List<TopicPartition>` is much safer.)

            We do have to modify the `KafkaConfigBackingStore.start()` method to use this new method, but that would be super simple logic.

            Personally, I think this is a great approach: it's simple and localizes the changes pretty well.

            Option 4

            A slight variation of Option 3 is to not introduce a new field and getter in `KafkaBasedLog` that returns the partition information, but to instead pass a "partition validation" function into the `KafkaBasedLog` constructor and then to use this in the `start()` method. The benefit is that we don't have to expose any new methods on `KafkaBasedLog`, but we have to change the constructor.

            This really has all the same benefits as option 3, but it's a little more hard to follow the logic. So I don't like this quite as much as option 3.

             

             

            rhauch Randall Hauch added a comment - Thanks for volunteering to fix this, EeveeB , and for identifying three potential approaches to fixing this! Before I discuss your approaches, I do want to first confirm something. If the Connect worker creates the config topic, it does so always with a single partition. That means that we're only concerned with scenarios where the topic was manually created (or modified) before the worker was started. Is that correct? Okay, now to your approaches. Just to clarify, the call path is basically: `DistributedHerder.run()` calls `startServices()` `DistributedHerder.startServices()` calls `configBackingStore.start()` `KafkaConfigBackingStore.start()` calls `configLog.start()` `KafkaBasedLog.start()` calls `initializer.run()` We can see that the `KafkaBasedLog.start()` method already has code that throws a ConnectException, so we know that already stops the herder from running (which is the desired behavior). So, as long as the `KafkaConfigBackingStore.start()` method (or anything called within it, including `initializer.run()`) throws a ConnectException with the appropriate error, the herder will stop. Option 1 This option would work, but it seems to be a fair amount of work compared the others.  Option 2 IIUC, your second option is to modify the initializer function defined in `KafkaConfigBackingStore` to also get/check the number of partitions and to throw a ConnectException if the topic already exists and has more than one partition. This would require modifying the TopicAdmin to get the metadata for the existing topic and return it. While that's probably doable, it's more complicated than your next option. Option 3 This is a good idea, too, especially because the `KafkaBasedLog.start()` method is already getting the partition information from the consumer in the form of a `List<PartitionInfo>` for the one topic (or a bit later, the `List<TopicPartition>` for the topic). If it stored that as a local variable and return an immutable version of that map via a method, the `KafkaConfigBackingStore.start()` method could use this method and fail if there is more than 1 partition. The great thing about this approach is that we don't have to modify the `TopicAdmin` utility or the initializer. The changes to `KafkaBasedLog` are minimal – we just need the getter method to return an immutable list of immutable `TopicPartition` objects. (Note that we could return `PartitionInfo`, but it's not immutable and we don't know how our new getter method might be used. Returning an immutable `List<TopicPartition>` is much safer.) We do have to modify the `KafkaConfigBackingStore.start()` method to use this new method, but that would be super simple logic. Personally, I think this is a great approach: it's simple and localizes the changes pretty well. Option 4 A slight variation of Option 3 is to not introduce a new field and getter in `KafkaBasedLog` that returns the partition information, but to instead pass a "partition validation" function into the `KafkaBasedLog` constructor and then to use this in the `start()` method. The benefit is that we don't have to expose any new methods on `KafkaBasedLog`, but we have to change the constructor. This really has all the same benefits as option 3, but it's a little more hard to follow the logic. So I don't like this quite as much as option 3.    
            rhauch Randall Hauch added a comment - - edited

            For the record, this does change the behavior but I don't think this will require a KIP. Anyone in this situation will almost certainly run into problems if they're running a multi-node distributed Connect cluster. They may not run into this if they're just using a single worker, but even in this situation there still is the possibility of a problem, since the configs no longer have a total order.

            And because this is fixing a potential problem, we could consider backporting it to a few recent branches.

            This does not affect standalone mode at all.

             

            EeveeB one thing we probably also want to do is make sure the exception message contains hopefully concrete information about what the user should do to correct the problem. This may be the hardest part.  

            rhauch Randall Hauch added a comment - - edited For the record, this does change the behavior but I don't think this will require a KIP. Anyone in this situation will almost certainly run into problems if they're running a multi-node distributed Connect cluster. They may  not run into this if they're just using a single worker, but even in this situation there still is the possibility of a problem, since the configs no longer have a total order. And because this is fixing a potential problem, we could consider backporting it to a few recent branches. This does not affect standalone mode at all.   EeveeB  one thing we probably also want to do is make sure the exception message contains hopefully concrete information about what the user should do to correct the problem. This may be the hardest part.  
            EeveeB Evelyn Bayes added a comment -

            rhauch I think your interpretation is a little off.

            When Kafka Connect starts it sends a create topic command (with 1 partition) in a fire and forget fashion.

            It then operates under the assumption the topic is there.

            As for your choices, the third option was my favourite too.

            EeveeB Evelyn Bayes added a comment - rhauch  I think your interpretation is a little off. When Kafka Connect starts it sends a create topic command (with 1 partition) in a fire and forget fashion. It then operates under the assumption the topic is there. As for your choices, the third option was my favourite too.
            githubbot ASF GitHub Bot added a comment -

            Evelyn-Bayes commented on pull request #8270: KAFKA-9216: Enforce connect internal topic configuration at startup
            URL: https://github.com/apache/kafka/pull/8270

            Currently, if Kafka Connect will create its config backing topic with a fire and forget approach.
            This is fine unless someone has manually created that topic already with the wrong partition count.

            In such a case Kafka Connect "may" run for some time.
            Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

            To counter this I've added a check when the KafkaConfigBackingStore is starting.
            This check will throw a ConfigException if there is more than one partition in the backing store.

            This exception is then caught upstream and logged by either:

            • class: DistributedHerder, method: run
            • class: ConnectStandalone, method: main

            After a review I don't believe it impacts any other upstream code.

            Finally, to supper this new functionality I've added a public method to KafkaBasedLog which returns the partition count and a variable to store this.

            And, I've created a unit test in KafkaConfigBackingStoreTest to verify the behaviour.

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on to GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - Evelyn-Bayes commented on pull request #8270: KAFKA-9216 : Enforce connect internal topic configuration at startup URL: https://github.com/apache/kafka/pull/8270 Currently, if Kafka Connect will create its config backing topic with a fire and forget approach. This is fine unless someone has manually created that topic already with the wrong partition count. In such a case Kafka Connect "may" run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail. To counter this I've added a check when the KafkaConfigBackingStore is starting. This check will throw a ConfigException if there is more than one partition in the backing store. This exception is then caught upstream and logged by either: class: DistributedHerder, method: run class: ConnectStandalone, method: main After a review I don't believe it impacts any other upstream code. Finally, to supper this new functionality I've added a public method to KafkaBasedLog which returns the partition count and a variable to store this. And, I've created a unit test in KafkaConfigBackingStoreTest to verify the behaviour. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            gharris1727 Greg Harris added a comment -

            I believe that KAFKA-5087 has an intent similar to this issue, but was never implemented, and erroneously closed as a duplicate.

            It does call out the need to enforce that the topics are log-compacted rather than time- or size-deleted in order to prevent loss of state, which I think would be a valuable addition to the validation suggested here.

            One additional aspect that isn't covered yet is that the retention period of the topics should also be infinite, since loss of state due to long-running connectors is undesirable.

            I think that any one of these issues (config partition count != 1, cleanup policy != compact, or retention != infinite) should generate an exception during startup, to prevent the worker from operating on top of a lossy state-store.

            gharris1727 Greg Harris added a comment - I believe that KAFKA-5087 has an intent similar to this issue, but was never implemented, and erroneously closed as a duplicate. It does call out the need to enforce that the topics are log-compacted rather than time- or size-deleted in order to prevent loss of state, which I think would be a valuable addition to the validation suggested here. One additional aspect that isn't covered yet is that the retention period of the topics should also be infinite, since loss of state due to long-running connectors is undesirable. I think that any one of these issues (config partition count != 1, cleanup policy != compact, or retention != infinite) should generate an exception during startup, to prevent the worker from operating on top of a lossy state-store.
            rhauch Randall Hauch added a comment -

            The previous PR only checked the number of partitions, so I'm going to reopen this to add another PR that checks the internal topic cleanup policy, which should be `compact` (only), and should not be `delete,compact` or `delete`. Using any other topic cleanup policy for the internal topics can lead to lost configurations, source offsets, or statuses.

            rhauch Randall Hauch added a comment - The previous PR only checked the number of partitions, so I'm going to reopen this to add another PR that checks the internal topic cleanup policy, which should be `compact` (only), and should not be `delete,compact` or `delete`. Using any other topic cleanup policy for the internal topics can lead to lost configurations, source offsets, or statuses.
            rhauch Randall Hauch added a comment - - edited

            kkonstantine, thanks for reviewing and merging EeveeB's PR to check the partition count of the connector configs topic. I've created a followup PR (https://github.com/apache/kafka/pull/8828) that verifies that each internal topic has the `cleanup.policy=compact`; the worker fails to start if the cleanup policy is any other value (e.g., `delete`, `delete,compact` or `compact,delete`). The logic will avoid the check:

            • if the worker just created the topic, since the topic is always created with `cleanup.policy=compact`, or
            • if the Kafka prinicpal used by Connect to read/write the internal topics does not have ACLs to describe topic configs (cluster or topic authorization exception), or
            • if the Kafka broker is older than 0.11.0.0 (unsupported version exception)

            Note that I chose to avoid checking retention, since that only applies if the `delete` cleanup policy is used. Therefore, the check described in the preceding paragraph should be sufficient.

            rhauch Randall Hauch added a comment - - edited kkonstantine , thanks for reviewing and merging EeveeB 's PR to check the partition count of the connector configs topic. I've created a followup PR ( https://github.com/apache/kafka/pull/8828 ) that verifies that each internal topic has the `cleanup.policy=compact`; the worker fails to start if the cleanup policy is any other value (e.g., `delete`, `delete,compact` or `compact,delete`). The logic will avoid the check: if the worker just created the topic, since the topic is always created with `cleanup.policy=compact`, or if the Kafka prinicpal used by Connect to read/write the internal topics does not have ACLs to describe topic configs (cluster or topic authorization exception), or if the Kafka broker is older than 0.11.0.0 (unsupported version exception) Note that I chose to avoid checking retention, since that only applies if the `delete` cleanup policy is used. Therefore, the check described in the preceding paragraph should be sufficient.
            ChrisEgerton Chris Egerton added a comment - - edited

            Based on this change it looks like we've altered workers to fail on startup if the config topic has multiple partitions. How should users respond in this case? As far as I know there's no out-of-the-box tool to reduce the number of partitions for a topic.

            Do we expect them to just delete the topic and then recreate it? If so, could we say that explicitly (or even just recommend that they delete the topic and then allow the framework to create it automatically, ACLs permitting)? If not, can we outline a recommended flow?

            I love the idea here and it should save users plenty of headaches in the future, but I think we might want to consider the impact on users who may see this during an upgrade and try to outline clear steps for them to take, either directly in the error message or on this ticket (which can be linked to in the error message).

            ChrisEgerton Chris Egerton added a comment - - edited Based on this change  it looks like we've altered workers to fail on startup if the config topic has multiple partitions. How should users respond in this case? As far as I know there's no out-of-the-box tool to reduce the number of partitions for a topic. Do we expect them to just delete the topic and then recreate it? If so, could we say that explicitly (or even just recommend that they delete the topic and then allow the framework to create it automatically, ACLs permitting)? If not, can we outline a recommended flow? I love the idea here and it should save users plenty of headaches in the future, but I think we might want to consider the impact on users who may see this during an upgrade and try to outline clear steps for them to take, either directly in the error message or on this ticket (which can be linked to in the error message).
            rhauch Randall Hauch added a comment -

            Thanks, ChrisEgerton. I think we came to consensus on the PR by improving the error message with better instructions.

            rhauch Randall Hauch added a comment - Thanks, ChrisEgerton . I think we came to consensus on the PR by improving the error message with better instructions.
            rhauch Randall Hauch added a comment -

            Merged to `trunk` the second PR that enforces the `cleanup.policy` topic setting on Connect's three internal topics, and cherry-picked it to the `2.6` (for upcoming 2.6.0). However, merging to earlier branches requires too many changes in integration tests.

            rhauch Randall Hauch added a comment - Merged to `trunk` the second PR that enforces the `cleanup.policy` topic setting on Connect's three internal topics, and cherry-picked it to the `2.6` (for upcoming 2.6.0). However, merging to earlier branches requires too many changes in integration tests.

            People

              EeveeB Evelyn Bayes
              rhauch Randall Hauch
              Randall Hauch Randall Hauch
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: