Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14565

Interceptor Resource Leak

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.5.0, 3.4.1, 3.3.3
    • clients
    • None

    Description

      The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. 

      Currently within the Kafka Consumer and Kafka Producer constructors,  the AbstractConfig.getConfiguredInstances()  is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.

      This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception.  This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors is never created and therefore the first interceptor's and really any interceptor's close method are never called.  

      To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose two approaches:

      PROPOSAL 1
      Define a default open or configureWithResources() or acquireResources()  method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces.  This method as a part the interceptor life cycle management will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up.  Additionally, this default method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented mitigating backwards compatibility impact to exiting interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding maybeOpen or maybeConfigureWithResources or maybeAcquireResources method which also throws a checked exception. 

      See below code excerpt for the Consumer/Producer constructor:

      List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
              ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
              ConsumerInterceptor.class,
              Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
      
      this.interceptors = new ConsumerInterceptors<>(interceptorList);
      this.interceptors.maybeConfigureWithResources();
       

      PROPOSAL 2

      To avoid changing any public interfaces and the subsequent KIP process, we can

      • Create a class which inherits or wraps AbstractConfig that contains a new method which will return a ConfiguredInstanceResult class.  This ConfiguredInstanceResult  class will contain an optional list of successfully created interceptors and/or exception which occurred while calling each Interceptor::configure.  Additionally, it will contain a helper method to rethrow an exception as well as a method which returns the underlying exception.  The caller is expected to handle the exception and perform clean up e.g. call  Interceptor::close  on each interceptor in the list provided by the ConfiguredInstanceResult class.
      • Automatically invoke close on any Closeable or AutoCloseable instances if/when a failure occurs
      • Add a new overloaded getConfiguredInstance / getConfiguredInstances variant that allows users to specify whether already-instantiated classes should be closed or not when a failure occurs
      • Add a new exception type to the public API that includes a list of all of the successfully-instantiated (and/or successfully-configured) instances before the error was encountered so that callers can choose how to handle the failure however they want (and possibly so that instantiation/configuration can be attempted on every class before throwing the exception)

       

       

       

      Attachments

        Issue Links

          Activity

            People

              beardt Terry Beard
              beardt Terry Beard
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: