Camel
  1. Camel
  2. CAMEL-4556

NettyProducer creating new connection on every message

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 2.8.1
    • Fix Version/s: 2.9.2, 2.10.0
    • Component/s: camel-netty
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

      It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

      Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable or where more than one response is expected on a channel.

      This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)

      package netty;

      import java.util.Arrays;
      import java.util.Collection;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;
      import java.util.concurrent.atomic.AtomicInteger;

      import junit.framework.TestCase;

      import org.apache.camel.CamelContext;
      import org.apache.camel.Exchange;
      import org.apache.camel.ExchangePattern;
      import org.apache.camel.Processor;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.impl.DefaultCamelContext;
      import org.junit.Before;
      import org.junit.BeforeClass;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.Parameterized;
      import org.junit.runners.Parameterized.Parameters;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;

      @RunWith(Parameterized.class)
      public class NettyTest extends TestCase
      {
      private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
      private final static CamelContext serverContext = new DefaultCamelContext();

      private final CamelContext clientContext = new DefaultCamelContext();
      private final AtomicInteger responseCounter = new AtomicInteger(0);
      private final AtomicBoolean passedTen = new AtomicBoolean(false);

      private Boolean disconnectClient;

      public NettyTest(Boolean disconnectClient)

      { this.disconnectClient = disconnectClient; }

      @Parameters
      public static Collection<Object[]> configs()
      {
      return Arrays.asList(new Object[][] {

      { true }

      ,

      { false }

      });
      }

      @BeforeClass
      public static void createServer() throws Exception
      {
      serverContext.addRoutes(new RouteBuilder()
      {
      @Override
      public void configure() throws Exception
      {
      from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
      .setExchangePattern(ExchangePattern.InOut)
      .process(new Processor() {

      @Override
      public void process(Exchange exchange) throws Exception
      {
      Object body = exchange.getIn().getBody();
      logger.info("Request received : Value = {}", body);
      }

      })
      .transform(constant(3)).stop();
      }
      });

      serverContext.start();
      }

      @Before
      public void createClient() throws Exception
      {
      clientContext.addRoutes(new RouteBuilder()
      {
      @Override
      public void configure() throws Exception
      {
      // Generate an Echo message and ensure a Response is sent
      from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
      .setExchangePattern(ExchangePattern.InOut)
      .transform()
      .constant(2)
      .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
      .process(new Processor()
      {
      @Override
      public void process(Exchange exchange) throws Exception
      {
      Object body = exchange.getIn().getBody();
      logger.info("Response number {} : Value = {}",
      responseCounter.incrementAndGet(), body);

      if (responseCounter.get() > 10)

      { passedTen.set(true); }


      }

      }).stop();
      }
      });
      }

      @Test
      public void test() throws Exception
      {
      clientContext.getShutdownStrategy().setTimeout(1);

      clientContext.start();

      logger.info("Disconnect = {}", this.disconnectClient);

      Thread.sleep(TimeUnit.SECONDS.toMillis(15));

      clientContext.stop();

      assertTrue("More than 10 responses have been received", passedTen.get());
      }
      }

        Activity

        Hide
        Claus Ibsen added a comment -

        This is now working on trunk where the connection will be re-used.

        There is a slight API change, but I think it may be worthwhile to backport that to 2.9 branch. It only affects people who develop custom pipeline factories which should only be a few use-cases.

        Show
        Claus Ibsen added a comment - This is now working on trunk where the connection will be re-used. There is a slight API change, but I think it may be worthwhile to backport that to 2.9 branch. It only affects people who develop custom pipeline factories which should only be a few use-cases.

          People

          • Assignee:
            Claus Ibsen
            Reporter:
            Matthew McMahon
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development