Camel
  1. Camel
  2. CAMEL-2624

support for async responses on single tcp connection

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 2.2.0
    • Fix Version/s: Future
    • Component/s: camel-mina2
    • Labels:
      None
    • Environment:

      any

    • Estimated Complexity:
      Advanced

      Description

      Mina Component does not support async InOut connections.
      Many applications require two way or out only async tcp connections this is not supported.
      Single TCP connection to external server. Ability to send multiple requests one after the other without waiting for a response. responses need to be processed asynchronously.

      Use Cases to Support
      1. UC-1 - Consumer sends messages after connect
        1. TCP producer (P1) connects to TCP consumer/server (C1), session is created
        2. C1 sends many messages to P1
        3. P1 receives messages and processes them
        4. C1 or P1 close the connection
      2. UC-2 - Full async session
        1. P1 connects to C1
        2. P1 and C1 send and receive messages ansynchronously. There is no blocking write-read loop.
      1. Mina2TcpAsyncOutOnly.java
        9 kB
        Chad Beaulac
      2. Mina2ClientServerTest.java
        6 kB
        Chad Beaulac

        Issue Links

          Activity

          Hide
          Chad Beaulac added a comment - - edited

          I'm un-assigning myself. Using a filter like Claus suggests in a comment above might be a good approach. If I go that route and it meets the requirements, I'll share the progress.

          Show
          Chad Beaulac added a comment - - edited I'm un-assigning myself. Using a filter like Claus suggests in a comment above might be a good approach. If I go that route and it meets the requirements, I'll share the progress.
          Hide
          Chad Beaulac added a comment - - edited

          I'm not ready to implement the Camel AsycProcessor for camel-mina2 at this time. Not sure it's a good fit for these use cases. How does camel-netty handle the use cases in the description of this ticket? I read the API docs and looked at the code for a few minutes but didn't see how to do it. Docs talk about handling the request-reply pattern asynchronously. Use cases in this ticket handle a conversation pattern when any number of messages can be exchanged by endpoints in either direction. Using a mina codec (or probably filter) could be a good approach. I'll have to think about that. For now I'm going to use the java.nio beans and mina2 code I've been using. I guess the crux of the issue is, the uses cases in this ticket don't fit the "normal use-case is 1 Exchange per complete message received" Claus mentions above. I'm still wondering if I'm trying to fit a square peg into a round hole here or if I'm not being creative enough.

          Show
          Chad Beaulac added a comment - - edited I'm not ready to implement the Camel AsycProcessor for camel-mina2 at this time. Not sure it's a good fit for these use cases. How does camel-netty handle the use cases in the description of this ticket? I read the API docs and looked at the code for a few minutes but didn't see how to do it. Docs talk about handling the request-reply pattern asynchronously. Use cases in this ticket handle a conversation pattern when any number of messages can be exchanged by endpoints in either direction. Using a mina codec (or probably filter) could be a good approach. I'll have to think about that. For now I'm going to use the java.nio beans and mina2 code I've been using. I guess the crux of the issue is, the uses cases in this ticket don't fit the "normal use-case is 1 Exchange per complete message received" Claus mentions above. I'm still wondering if I'm trying to fit a square peg into a round hole here or if I'm not being creative enough.
          Hide
          Claus Ibsen added a comment -

          If you want fine grained events for session open / partly packages received / and whatnot then you should use a custom mina codec (or what they call it).

          Its not a good model to fire empty Camel Exchange in routes for these kind of events. The normal use-case is 1 Exchange per complete message received. If people want something different, then use a custom codec to hook into these events. And/or use the Mina API directly.

          Show
          Claus Ibsen added a comment - If you want fine grained events for session open / partly packages received / and whatnot then you should use a custom mina codec (or what they call it). Its not a good model to fire empty Camel Exchange in routes for these kind of events. The normal use-case is 1 Exchange per complete message received. If people want something different, then use a custom codec to hook into these events. And/or use the Mina API directly.
          Hide
          Claus Ibsen added a comment -

          camel-mina2 code should first be aligned to be similar to what we do in camel-netty to be asynchronous, and leverage the Camel async routing engine.

          Show
          Claus Ibsen added a comment - camel-mina2 code should first be aligned to be similar to what we do in camel-netty to be asynchronous, and leverage the Camel async routing engine.
          Hide
          Chad Beaulac added a comment -

          Anybody like the idea of adding the sendSessionMsgs to avoid sending session created/destroyed events through the route for non-full-async endpoints? I'll add it if we agree that's a good approach.

          Show
          Chad Beaulac added a comment - Anybody like the idea of adding the sendSessionMsgs to avoid sending session created/destroyed events through the route for non-full-async endpoints? I'll add it if we agree that's a good approach.
          Hide
          Chad Beaulac added a comment - - edited
          New URI Attribute

          What if I add a new attribute called sendSessionMsgs and default it to false? If you set the attr to true you will receive all of the session created/closed messages also and allow endpoints to implement full-async producers and consumers.
          Would the patch be acceptable after this change?

          Thanks, Chad

          Show
          Chad Beaulac added a comment - - edited New URI Attribute What if I add a new attribute called sendSessionMsgs and default it to false? If you set the attr to true you will receive all of the session created/closed messages also and allow endpoints to implement full-async producers and consumers. Would the patch be acceptable after this change? Thanks, Chad
          Hide
          Chad Beaulac added a comment - - edited

          ...too fine grained. We should only invoke the processor when the message is received...

          This solution exposes the TCP/UDP Session create/destroy events to the Route. This is necessary to meet the requirement.

          Doing true request/reply over mina2 with async is harder as you would need to do something like camel-jms doing. Where we use a correlation id to match up incoming messages with its corresponding request message. And when we have a match we continue routing.

          We don't need to do what camel-jms is doing and correlate sessions. The Mina2 IoSession is exposed to the Route. Producers and Consumers on a given camel-mina2 Route can implement their own session management. Session management is intentionally left as an exercise to the developer using camel-mina2.

           from(String.format("mina2:tcp://localhost:8070?&sync=false").beanRef("myTCPSessionManager");
          

          ...need to deal with timeouts...

          The InOut and InOnly patterns are supported just how they are in camel-mina and currently in camel-mina2, except that this patch further exposes the IoSession in order to handle the other use cases. Handling timeouts in a fully asynchronous connection is left to the develop (of the myTCPSessionManager above).
          Developers writing APIs for simple interfaces that send a few messages back and forth don't have to get too serious with throughput. Developers writing endpoints that handle streams dealing with rates greater than 900Mbits/sec might implement their session handling differently. I'm dealing with the latter.

          We need to expose the session created/closed events in order to support full asynchronous comms. The Processors, Beans or IoHandlers in the Camel Route chain and/or attached to the camel-mina2 Producer/Consumer are definitely interested in session create/closed events flowing through the Route.
          The Processor that comes after the Consumer in the Mina2TCPAsyncOutOnlyTest gets the IoSession from the exchange and sends messages back to the Producer asynchronously. Code below.

          // The IoSession has been created. Send 300 messages back to the Producer.
          IoSession session = (IoSession) e.getIn().getHeader(Mina2Constants.MINA2_IOSESSION);
          for (int i = 0; i < 300; i++) {
              String msg = "message " + i;
              session.write(msg);
          }
          

          Please take another look and/or provide alternatives. This patch handles the existing InOut and InOnly pattern requirements as well as allowing fully asynchronous Producers and Consumers.

          Show
          Chad Beaulac added a comment - - edited ...too fine grained. We should only invoke the processor when the message is received... This solution exposes the TCP/UDP Session create/destroy events to the Route. This is necessary to meet the requirement. Doing true request/reply over mina2 with async is harder as you would need to do something like camel-jms doing. Where we use a correlation id to match up incoming messages with its corresponding request message. And when we have a match we continue routing. We don't need to do what camel-jms is doing and correlate sessions. The Mina2 IoSession is exposed to the Route. Producers and Consumers on a given camel-mina2 Route can implement their own session management. Session management is intentionally left as an exercise to the developer using camel-mina2. from( String .format( "mina2:tcp: //localhost:8070?&sync= false " ).beanRef( "myTCPSessionManager" ); ...need to deal with timeouts... The InOut and InOnly patterns are supported just how they are in camel-mina and currently in camel-mina2, except that this patch further exposes the IoSession in order to handle the other use cases. Handling timeouts in a fully asynchronous connection is left to the develop (of the myTCPSessionManager above). Developers writing APIs for simple interfaces that send a few messages back and forth don't have to get too serious with throughput. Developers writing endpoints that handle streams dealing with rates greater than 900Mbits/sec might implement their session handling differently. I'm dealing with the latter. We need to expose the session created/closed events in order to support full asynchronous comms. The Processors, Beans or IoHandlers in the Camel Route chain and/or attached to the camel-mina2 Producer/Consumer are definitely interested in session create/closed events flowing through the Route. The Processor that comes after the Consumer in the Mina2TCPAsyncOutOnlyTest gets the IoSession from the exchange and sends messages back to the Producer asynchronously. Code below. // The IoSession has been created. Send 300 messages back to the Producer. IoSession session = (IoSession) e.getIn().getHeader(Mina2Constants.MINA2_IOSESSION); for ( int i = 0; i < 300; i++) { String msg = "message " + i; session.write(msg); } Please take another look and/or provide alternatives. This patch handles the existing InOut and InOnly pattern requirements as well as allowing fully asynchronous Producers and Consumers.
          Hide
          Claus Ibsen added a comment -

          Also you would need to deal with timeouts, in case the other party does not respond in a timely manner, then you would need to be able to trigger a timeout. And if the reply comes back later you would need to deal with this and detect it has previously been timed out and discard it (eg its no longer in the pending correlation ids map)

          Show
          Claus Ibsen added a comment - Also you would need to deal with timeouts, in case the other party does not respond in a timely manner, then you would need to be able to trigger a timeout. And if the reply comes back later you would need to deal with this and detect it has previously been timed out and discard it (eg its no longer in the pending correlation ids map)
          Hide
          Claus Ibsen added a comment -

          This is too fine grained. We should only invoke the processor when the message is received. And not also when session is created / closed etc. These are just mina events which a Camel route is not interested in routing.

          Doing true request/reply over mina2 with async is harder as you would need to do something like camel-jms doing. Where we use a correlation id to match up incoming messages with its corresponding request message. And when we have a match we continue routing.

          Show
          Claus Ibsen added a comment - This is too fine grained. We should only invoke the processor when the message is received. And not also when session is created / closed etc. These are just mina events which a Camel route is not interested in routing. Doing true request/reply over mina2 with async is harder as you would need to do something like camel-jms doing. Where we use a correlation id to match up incoming messages with its corresponding request message. And when we have a match we continue routing.
          Hide
          Chad Beaulac added a comment -

          I created a Github pull request with a patch for this ticket. Please review and apply it if it's acceptable. The send-receive patterns have the same behavior. Full async behavior requires that sync=false be set on the endpoints. Current users of camel-mina2 must handle null message bodies since that's how session creation and closed events are handled.

          Show
          Chad Beaulac added a comment - I created a Github pull request with a patch for this ticket. Please review and apply it if it's acceptable. The send-receive patterns have the same behavior. Full async behavior requires that sync=false be set on the endpoints. Current users of camel-mina2 must handle null message bodies since that's how session creation and closed events are handled.
          Hide
          Chad Beaulac added a comment -

          All 88 unit tests pass with new async functionality. Now adding ~10 more unit tests to show users how to use the full-async functionality before submitting patch.

          Show
          Chad Beaulac added a comment - All 88 unit tests pass with new async functionality. Now adding ~10 more unit tests to show users how to use the full-async functionality before submitting patch.
          Hide
          Chad Beaulac added a comment -

          I anticipate submitting a patch to resolve this ticket sometime this week.

          -Chad

          Show
          Chad Beaulac added a comment - I anticipate submitting a patch to resolve this ticket sometime this week. -Chad
          Hide
          Chad Beaulac added a comment -

          There are some issues that I'd like to discuss with this ticket. Making the mina2 component fully async has significant ramifications how it's used and how it's used in conjunction with the template producer. So far I've enable the consumer to be async by setting the state of the connection as a property in the exchange. This allows developers to implement the consumer side of the connection in a Processor.

          Consumer Side implemented as Processor
                             public void process(Exchange e) {
                                  Boolean prop = (Boolean) e.getProperty(
                                      Mina2Constants.MINA2_SESSION_CREATED);
                                  if (prop != null) {
                                      sessionCreated = prop;
                                      receivedExchange = e;
                                      latch.countDown();
                                  }
                                  prop = (Boolean) e.getProperty(
                                      Mina2Constants.MINA2_SESSION_OPENED);
                                  // Received session open. Countdown the latch
                                  if (prop != null) {
                                      latch.countDown();
                                  }
                                  prop = (Boolean) e.getProperty(
                                      Mina2Constants.MINA2_SESSION_CLOSED);
                                  // Received session closed. Countdown the latch
                                  if (prop != null) {
                                      latch.countDown();
                                  }
                              }
          

          The producer side is a challenge. So far I've implemented async behavior by attaching a Mina IoHandlerAdapter to the Producer. This can be done in the URI like:

          template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&ioHandler=#closeIoHandler", getPort()), "Chad");
          

          The concern with this is that it makes the Producer client code and little more complicated and makes the use of the template.requestBody() unsupported.

          Producer Using ioHandler URI param to register IoHandler for async comms
             @Test
              public void testSendOneCloseToServer() throws InterruptedException {
                  latch = new CountDownLatch(1);
                  closeIoHandler.setLatch(latch);
                  template.sendBody(String.format("mina2:tcp://localhost:%1$s?textline=true&ioHandler=#closeIoHandler", getPort()), "Chad");
                  latch.await(2, TimeUnit.SECONDS);
                  assertEquals("Hello Chad", closeIoHandler.getMessage());
              }
          

          In the snippet above the client sends a message to the Consumer and receives a response in the IoHandler that's been put into the JNDIRegistry.

          Currently, I'm working to finish porting unit tests to use this behavior.
          It could be possible to add a switch option that would allow for the use of template.requestBody as a synchronous call. If that becomes a necessary function, I thought I'd add that after this initial port to full async functionality.
          Dialogue, questions and comments are appreciated.

          Thanks, Chad

          Show
          Chad Beaulac added a comment - There are some issues that I'd like to discuss with this ticket. Making the mina2 component fully async has significant ramifications how it's used and how it's used in conjunction with the template producer. So far I've enable the consumer to be async by setting the state of the connection as a property in the exchange. This allows developers to implement the consumer side of the connection in a Processor. Consumer Side implemented as Processor public void process(Exchange e) { Boolean prop = ( Boolean ) e.getProperty( Mina2Constants.MINA2_SESSION_CREATED); if (prop != null ) { sessionCreated = prop; receivedExchange = e; latch.countDown(); } prop = ( Boolean ) e.getProperty( Mina2Constants.MINA2_SESSION_OPENED); // Received session open. Countdown the latch if (prop != null ) { latch.countDown(); } prop = ( Boolean ) e.getProperty( Mina2Constants.MINA2_SESSION_CLOSED); // Received session closed. Countdown the latch if (prop != null ) { latch.countDown(); } } The producer side is a challenge. So far I've implemented async behavior by attaching a Mina IoHandlerAdapter to the Producer. This can be done in the URI like: template.sendBody( String .format( "mina2:tcp: //localhost:%1$s?textline= true &ioHandler=#closeIoHandler" , getPort()), "Chad" ); The concern with this is that it makes the Producer client code and little more complicated and makes the use of the template.requestBody() unsupported. Producer Using ioHandler URI param to register IoHandler for async comms @Test public void testSendOneCloseToServer() throws InterruptedException { latch = new CountDownLatch(1); closeIoHandler.setLatch(latch); template.sendBody( String .format( "mina2:tcp: //localhost:%1$s?textline= true &ioHandler=#closeIoHandler" , getPort()), "Chad" ); latch.await(2, TimeUnit.SECONDS); assertEquals( "Hello Chad" , closeIoHandler.getMessage()); } In the snippet above the client sends a message to the Consumer and receives a response in the IoHandler that's been put into the JNDIRegistry. Currently, I'm working to finish porting unit tests to use this behavior. It could be possible to add a switch option that would allow for the use of template.requestBody as a synchronous call. If that becomes a necessary function, I thought I'd add that after this initial port to full async functionality. Dialogue, questions and comments are appreciated. Thanks, Chad
          Hide
          Chad Beaulac added a comment -

          Attached Mina2ClientServerTest and Mina2TcpSyncOutOnly.
          The full-asych changes done so far affect how the template producer is used when the client wants a response. The client uses the template.sendBody method and processes the response asynchronously in the URI supplied ioHandler. The Mina2TcpSyncOutOnly tests that the processor on the mina2 consumer side can recognize session creation and can asynchronously send messages to a producer client. Please have a look at these tests and provide any comments you might have.

          Show
          Chad Beaulac added a comment - Attached Mina2ClientServerTest and Mina2TcpSyncOutOnly. The full-asych changes done so far affect how the template producer is used when the client wants a response. The client uses the template.sendBody method and processes the response asynchronously in the URI supplied ioHandler. The Mina2TcpSyncOutOnly tests that the processor on the mina2 consumer side can recognize session creation and can asynchronously send messages to a producer client. Please have a look at these tests and provide any comments you might have.
          Hide
          Chad Beaulac added a comment -

          Updated description with text from CAMEL-1075. Assigned to me. Changed component to camel-mina2. Set fix version to 2.10

          Show
          Chad Beaulac added a comment - Updated description with text from CAMEL-1075 . Assigned to me. Changed component to camel-mina2. Set fix version to 2.10
          Hide
          Chad Beaulac added a comment -

          CAMEL-3471 should resolve this ticket. It seems like the concept of ExchangePattern InOut and InOnly goes away. The disconnect and timeout component options should be sufficient for all use cases when all comms are asynchronous.

          Show
          Chad Beaulac added a comment - CAMEL-3471 should resolve this ticket. It seems like the concept of ExchangePattern InOut and InOnly goes away. The disconnect and timeout component options should be sufficient for all use cases when all comms are asynchronous.
          Hide
          Claus Ibsen added a comment -

          We need to upgrade to Mina 2.0.x before taking a stab at this

          Show
          Claus Ibsen added a comment - We need to upgrade to Mina 2.0.x before taking a stab at this
          Hide
          Ashwin Karpe added a comment - - edited

          Hi,

          Is this issue still open given that we have camel-netty in place with the required capability...

          Show
          Ashwin Karpe added a comment - - edited Hi, Is this issue still open given that we have camel-netty in place with the required capability...
          Hide
          Anand added a comment -

          Great!!. I am looking forward for it.

          Show
          Anand added a comment - Great!!. I am looking forward for it.
          Hide
          Claus Ibsen added a comment -

          The camel-netty component in Camel 2.4 onwards support async routing engine. That means it will let the responses be routed asynchronously, and thus the NettyProducer does not block.

          Show
          Claus Ibsen added a comment - The camel-netty component in Camel 2.4 onwards support async routing engine. That means it will let the responses be routed asynchronously, and thus the NettyProducer does not block.

            People

            • Assignee:
              Unassigned
              Reporter:
              Anand
            • Votes:
              3 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development