Camel
  1. Camel
  2. CAMEL-4556

NettyProducer creating new connection on every message

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 2.8.1
    • Fix Version/s: 2.9.2, 2.10.0
    • Component/s: camel-netty
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

      It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

      Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable or where more than one response is expected on a channel.

      This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)

      package netty;

      import java.util.Arrays;
      import java.util.Collection;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;
      import java.util.concurrent.atomic.AtomicInteger;

      import junit.framework.TestCase;

      import org.apache.camel.CamelContext;
      import org.apache.camel.Exchange;
      import org.apache.camel.ExchangePattern;
      import org.apache.camel.Processor;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.impl.DefaultCamelContext;
      import org.junit.Before;
      import org.junit.BeforeClass;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.Parameterized;
      import org.junit.runners.Parameterized.Parameters;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;

      @RunWith(Parameterized.class)
      public class NettyTest extends TestCase
      {
      private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
      private final static CamelContext serverContext = new DefaultCamelContext();

      private final CamelContext clientContext = new DefaultCamelContext();
      private final AtomicInteger responseCounter = new AtomicInteger(0);
      private final AtomicBoolean passedTen = new AtomicBoolean(false);

      private Boolean disconnectClient;

      public NettyTest(Boolean disconnectClient)

      { this.disconnectClient = disconnectClient; }

      @Parameters
      public static Collection<Object[]> configs()
      {
      return Arrays.asList(new Object[][] {

      { true }

      ,

      { false }

      });
      }

      @BeforeClass
      public static void createServer() throws Exception
      {
      serverContext.addRoutes(new RouteBuilder()
      {
      @Override
      public void configure() throws Exception
      {
      from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
      .setExchangePattern(ExchangePattern.InOut)
      .process(new Processor() {

      @Override
      public void process(Exchange exchange) throws Exception
      {
      Object body = exchange.getIn().getBody();
      logger.info("Request received : Value = {}", body);
      }

      })
      .transform(constant(3)).stop();
      }
      });

      serverContext.start();
      }

      @Before
      public void createClient() throws Exception
      {
      clientContext.addRoutes(new RouteBuilder()
      {
      @Override
      public void configure() throws Exception
      {
      // Generate an Echo message and ensure a Response is sent
      from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
      .setExchangePattern(ExchangePattern.InOut)
      .transform()
      .constant(2)
      .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
      .process(new Processor()
      {
      @Override
      public void process(Exchange exchange) throws Exception
      {
      Object body = exchange.getIn().getBody();
      logger.info("Response number {} : Value = {}",
      responseCounter.incrementAndGet(), body);

      if (responseCounter.get() > 10)

      { passedTen.set(true); }


      }

      }).stop();
      }
      });
      }

      @Test
      public void test() throws Exception
      {
      clientContext.getShutdownStrategy().setTimeout(1);

      clientContext.start();

      logger.info("Disconnect = {}", this.disconnectClient);

      Thread.sleep(TimeUnit.SECONDS.toMillis(15));

      clientContext.stop();

      assertTrue("More than 10 responses have been received", passedTen.get());
      }
      }

        Activity

        Matthew McMahon created issue -
        Matthew McMahon made changes -
        Field Original Value New Value
        Description Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

        It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

        Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable.
        Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

        It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

        Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable.

        --

        This is a small Unit Test that shows the problem.



        package netty;

        import java.util.Arrays;
        import java.util.Collection;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.concurrent.atomic.AtomicInteger;

        import junit.framework.TestCase;

        import org.apache.camel.CamelContext;
        import org.apache.camel.Exchange;
        import org.apache.camel.ExchangePattern;
        import org.apache.camel.Processor;
        import org.apache.camel.builder.RouteBuilder;
        import org.apache.camel.impl.DefaultCamelContext;
        import org.junit.Before;
        import org.junit.BeforeClass;
        import org.junit.Test;
        import org.junit.runner.RunWith;
        import org.junit.runners.Parameterized;
        import org.junit.runners.Parameterized.Parameters;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;

        @RunWith(Parameterized.class)
        public class NettyTest extends TestCase
        {
            private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
            private final static CamelContext serverContext = new DefaultCamelContext();

            private final CamelContext clientContext = new DefaultCamelContext();
            private final AtomicInteger responseCounter = new AtomicInteger(0);
            private final AtomicBoolean passedTen = new AtomicBoolean(false);

            private Boolean disconnectClient;

            public NettyTest(Boolean disconnectClient)
            {
                this.disconnectClient = disconnectClient;
            }

            @Parameters
            public static Collection<Object[]> configs()
            {
                return Arrays.asList(new Object[][] { { true }, { false } });
            }

            @BeforeClass
            public static void createServer() throws Exception
            {
                serverContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
                                .setExchangePattern(ExchangePattern.InOut)
                                .process(new Processor() {

                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Request received : Value = {}", body);
                                    }
                                    
                                })
                                .transform(constant(3)).stop();
                    }
                });

                serverContext.start();
            }

            @Before
            public void createClient() throws Exception
            {
                clientContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        // Generate an Echo message and ensure a Response is sent
                        from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
                                .setExchangePattern(ExchangePattern.InOut)
                                .transform()
                                .constant(2)
                                .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
                                .process(new Processor()
                                {
                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Response number {} : Value = {}",
                                                responseCounter.incrementAndGet(), body);

                                        if (responseCounter.get() > 10) {
                                            passedTen.set(true);
                                        }
                                    }

                                }).stop();
                    }
                });
            }

            @Test
            public void test() throws Exception
            {
                clientContext.getShutdownStrategy().setTimeout(1);

                clientContext.start();

                logger.info("Disconnect = {}", this.disconnectClient);

                Thread.sleep(TimeUnit.SECONDS.toMillis(15));

                clientContext.stop();

                assertTrue("More than 10 responses have been received", passedTen.get());
            }
        }
        Matthew McMahon made changes -
        Description Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

        It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

        Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable.

        --

        This is a small Unit Test that shows the problem.



        package netty;

        import java.util.Arrays;
        import java.util.Collection;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.concurrent.atomic.AtomicInteger;

        import junit.framework.TestCase;

        import org.apache.camel.CamelContext;
        import org.apache.camel.Exchange;
        import org.apache.camel.ExchangePattern;
        import org.apache.camel.Processor;
        import org.apache.camel.builder.RouteBuilder;
        import org.apache.camel.impl.DefaultCamelContext;
        import org.junit.Before;
        import org.junit.BeforeClass;
        import org.junit.Test;
        import org.junit.runner.RunWith;
        import org.junit.runners.Parameterized;
        import org.junit.runners.Parameterized.Parameters;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;

        @RunWith(Parameterized.class)
        public class NettyTest extends TestCase
        {
            private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
            private final static CamelContext serverContext = new DefaultCamelContext();

            private final CamelContext clientContext = new DefaultCamelContext();
            private final AtomicInteger responseCounter = new AtomicInteger(0);
            private final AtomicBoolean passedTen = new AtomicBoolean(false);

            private Boolean disconnectClient;

            public NettyTest(Boolean disconnectClient)
            {
                this.disconnectClient = disconnectClient;
            }

            @Parameters
            public static Collection<Object[]> configs()
            {
                return Arrays.asList(new Object[][] { { true }, { false } });
            }

            @BeforeClass
            public static void createServer() throws Exception
            {
                serverContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
                                .setExchangePattern(ExchangePattern.InOut)
                                .process(new Processor() {

                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Request received : Value = {}", body);
                                    }
                                    
                                })
                                .transform(constant(3)).stop();
                    }
                });

                serverContext.start();
            }

            @Before
            public void createClient() throws Exception
            {
                clientContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        // Generate an Echo message and ensure a Response is sent
                        from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
                                .setExchangePattern(ExchangePattern.InOut)
                                .transform()
                                .constant(2)
                                .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
                                .process(new Processor()
                                {
                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Response number {} : Value = {}",
                                                responseCounter.incrementAndGet(), body);

                                        if (responseCounter.get() > 10) {
                                            passedTen.set(true);
                                        }
                                    }

                                }).stop();
                    }
                });
            }

            @Test
            public void test() throws Exception
            {
                clientContext.getShutdownStrategy().setTimeout(1);

                clientContext.start();

                logger.info("Disconnect = {}", this.disconnectClient);

                Thread.sleep(TimeUnit.SECONDS.toMillis(15));

                clientContext.stop();

                assertTrue("More than 10 responses have been received", passedTen.get());
            }
        }
        Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

        It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

        Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable.

        --

        This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)



        package netty;

        import java.util.Arrays;
        import java.util.Collection;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.concurrent.atomic.AtomicInteger;

        import junit.framework.TestCase;

        import org.apache.camel.CamelContext;
        import org.apache.camel.Exchange;
        import org.apache.camel.ExchangePattern;
        import org.apache.camel.Processor;
        import org.apache.camel.builder.RouteBuilder;
        import org.apache.camel.impl.DefaultCamelContext;
        import org.junit.Before;
        import org.junit.BeforeClass;
        import org.junit.Test;
        import org.junit.runner.RunWith;
        import org.junit.runners.Parameterized;
        import org.junit.runners.Parameterized.Parameters;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;

        @RunWith(Parameterized.class)
        public class NettyTest extends TestCase
        {
            private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
            private final static CamelContext serverContext = new DefaultCamelContext();

            private final CamelContext clientContext = new DefaultCamelContext();
            private final AtomicInteger responseCounter = new AtomicInteger(0);
            private final AtomicBoolean passedTen = new AtomicBoolean(false);

            private Boolean disconnectClient;

            public NettyTest(Boolean disconnectClient)
            {
                this.disconnectClient = disconnectClient;
            }

            @Parameters
            public static Collection<Object[]> configs()
            {
                return Arrays.asList(new Object[][] { { true }, { false } });
            }

            @BeforeClass
            public static void createServer() throws Exception
            {
                serverContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
                                .setExchangePattern(ExchangePattern.InOut)
                                .process(new Processor() {

                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Request received : Value = {}", body);
                                    }
                                    
                                })
                                .transform(constant(3)).stop();
                    }
                });

                serverContext.start();
            }

            @Before
            public void createClient() throws Exception
            {
                clientContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        // Generate an Echo message and ensure a Response is sent
                        from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
                                .setExchangePattern(ExchangePattern.InOut)
                                .transform()
                                .constant(2)
                                .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
                                .process(new Processor()
                                {
                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Response number {} : Value = {}",
                                                responseCounter.incrementAndGet(), body);

                                        if (responseCounter.get() > 10) {
                                            passedTen.set(true);
                                        }
                                    }

                                }).stop();
                    }
                });
            }

            @Test
            public void test() throws Exception
            {
                clientContext.getShutdownStrategy().setTimeout(1);

                clientContext.start();

                logger.info("Disconnect = {}", this.disconnectClient);

                Thread.sleep(TimeUnit.SECONDS.toMillis(15));

                clientContext.stop();

                assertTrue("More than 10 responses have been received", passedTen.get());
            }
        }
        Matthew McMahon made changes -
        Description Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

        It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

        Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable.

        --

        This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)



        package netty;

        import java.util.Arrays;
        import java.util.Collection;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.concurrent.atomic.AtomicInteger;

        import junit.framework.TestCase;

        import org.apache.camel.CamelContext;
        import org.apache.camel.Exchange;
        import org.apache.camel.ExchangePattern;
        import org.apache.camel.Processor;
        import org.apache.camel.builder.RouteBuilder;
        import org.apache.camel.impl.DefaultCamelContext;
        import org.junit.Before;
        import org.junit.BeforeClass;
        import org.junit.Test;
        import org.junit.runner.RunWith;
        import org.junit.runners.Parameterized;
        import org.junit.runners.Parameterized.Parameters;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;

        @RunWith(Parameterized.class)
        public class NettyTest extends TestCase
        {
            private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
            private final static CamelContext serverContext = new DefaultCamelContext();

            private final CamelContext clientContext = new DefaultCamelContext();
            private final AtomicInteger responseCounter = new AtomicInteger(0);
            private final AtomicBoolean passedTen = new AtomicBoolean(false);

            private Boolean disconnectClient;

            public NettyTest(Boolean disconnectClient)
            {
                this.disconnectClient = disconnectClient;
            }

            @Parameters
            public static Collection<Object[]> configs()
            {
                return Arrays.asList(new Object[][] { { true }, { false } });
            }

            @BeforeClass
            public static void createServer() throws Exception
            {
                serverContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
                                .setExchangePattern(ExchangePattern.InOut)
                                .process(new Processor() {

                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Request received : Value = {}", body);
                                    }
                                    
                                })
                                .transform(constant(3)).stop();
                    }
                });

                serverContext.start();
            }

            @Before
            public void createClient() throws Exception
            {
                clientContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        // Generate an Echo message and ensure a Response is sent
                        from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
                                .setExchangePattern(ExchangePattern.InOut)
                                .transform()
                                .constant(2)
                                .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
                                .process(new Processor()
                                {
                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Response number {} : Value = {}",
                                                responseCounter.incrementAndGet(), body);

                                        if (responseCounter.get() > 10) {
                                            passedTen.set(true);
                                        }
                                    }

                                }).stop();
                    }
                });
            }

            @Test
            public void test() throws Exception
            {
                clientContext.getShutdownStrategy().setTimeout(1);

                clientContext.start();

                logger.info("Disconnect = {}", this.disconnectClient);

                Thread.sleep(TimeUnit.SECONDS.toMillis(15));

                clientContext.stop();

                assertTrue("More than 10 responses have been received", passedTen.get());
            }
        }
        Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.

        It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).

        Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable or where more than one response is expected on a channel.

        --

        This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)



        package netty;

        import java.util.Arrays;
        import java.util.Collection;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.concurrent.atomic.AtomicInteger;

        import junit.framework.TestCase;

        import org.apache.camel.CamelContext;
        import org.apache.camel.Exchange;
        import org.apache.camel.ExchangePattern;
        import org.apache.camel.Processor;
        import org.apache.camel.builder.RouteBuilder;
        import org.apache.camel.impl.DefaultCamelContext;
        import org.junit.Before;
        import org.junit.BeforeClass;
        import org.junit.Test;
        import org.junit.runner.RunWith;
        import org.junit.runners.Parameterized;
        import org.junit.runners.Parameterized.Parameters;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;

        @RunWith(Parameterized.class)
        public class NettyTest extends TestCase
        {
            private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
            private final static CamelContext serverContext = new DefaultCamelContext();

            private final CamelContext clientContext = new DefaultCamelContext();
            private final AtomicInteger responseCounter = new AtomicInteger(0);
            private final AtomicBoolean passedTen = new AtomicBoolean(false);

            private Boolean disconnectClient;

            public NettyTest(Boolean disconnectClient)
            {
                this.disconnectClient = disconnectClient;
            }

            @Parameters
            public static Collection<Object[]> configs()
            {
                return Arrays.asList(new Object[][] { { true }, { false } });
            }

            @BeforeClass
            public static void createServer() throws Exception
            {
                serverContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
                                .setExchangePattern(ExchangePattern.InOut)
                                .process(new Processor() {

                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Request received : Value = {}", body);
                                    }
                                    
                                })
                                .transform(constant(3)).stop();
                    }
                });

                serverContext.start();
            }

            @Before
            public void createClient() throws Exception
            {
                clientContext.addRoutes(new RouteBuilder()
                {
                    @Override
                    public void configure() throws Exception
                    {
                        // Generate an Echo message and ensure a Response is sent
                        from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
                                .setExchangePattern(ExchangePattern.InOut)
                                .transform()
                                .constant(2)
                                .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
                                .process(new Processor()
                                {
                                    @Override
                                    public void process(Exchange exchange) throws Exception
                                    {
                                        Object body = exchange.getIn().getBody();
                                        logger.info("Response number {} : Value = {}",
                                                responseCounter.incrementAndGet(), body);

                                        if (responseCounter.get() > 10) {
                                            passedTen.set(true);
                                        }
                                    }

                                }).stop();
                    }
                });
            }

            @Test
            public void test() throws Exception
            {
                clientContext.getShutdownStrategy().setTimeout(1);

                clientContext.start();

                logger.info("Disconnect = {}", this.disconnectClient);

                Thread.sleep(TimeUnit.SECONDS.toMillis(15));

                clientContext.stop();

                assertTrue("More than 10 responses have been received", passedTen.get());
            }
        }
        Claus Ibsen made changes -
        Assignee Claus Ibsen [ davsclaus ]
        Claus Ibsen made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Fix Version/s 2.10.0 [ 12317612 ]
        Fix Version/s 2.9.2 [ 12320143 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Claus Ibsen
            Reporter:
            Matthew McMahon
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development