Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-7034

Connection leak with JMSConsumer and JMSPublisher

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.10.0
    • 1.11.0
    • Extensions
    • None
    • Discovered against ActiveMQ.
    • Important

    Description

      JMS connections are not closed in case of a failure. Discovered against ActiveMQ, but it applies to other JMS servers.

      The problem happens when an exception is raised and the worker is marked as invalid. The current code discards the worker before closing it properly. Below the details.

      Details

      Any exception happening to a ConsumerJMS or PublisherJMS marks the worker as invalid. After that, the worker is discarded (the worker object reference is never cleaned). Below the snipped code of the issue:

      AbstractJMSProcessor
              } finally {
                  //in case of exception during worker's connection (consumer or publisher),
                  //an appropriate service is responsible to invalidate the worker.
                  //if worker is not valid anymore, don't put it back into a pool, try to rebuild it first, or discard.
                  //this will be helpful in a situation, when JNDI has changed, or JMS server is not available
                  //and reconnection is required.
                  if (worker == null || !worker.isValid()){
                      getLogger().debug("Worker is invalid. Will try re-create... ");
                      final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
                      try {
                          // Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
                          CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
                          cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
                          worker = buildTargetResource(context);
                      }catch(Exception e) {
                          getLogger().error("Failed to rebuild:  " + cfProvider);
                          worker = null;
                      }
                  }
      

      Before discard the worker, it should be cleaned all resources associated with it. The proper solution is to call worker.shutdown() and then discard it.

      Attachments

        Issue Links

          Activity

            People

              gardellajuanpablo Gardella Juan Pablo
              gardellajuanpablo Gardella Juan Pablo
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 2h
                  2h
                  Remaining:
                  Time Spent - 1.5h Remaining Estimate - 0.5h
                  0.5h
                  Logged:
                  Time Spent - 1.5h Remaining Estimate - 0.5h
                  1.5h