Qpid Proton
  1. Qpid Proton
  2. PROTON-222

pn_messenger_send returns before message data has been written to the wire

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.3
    • Fix Version/s: 0.4
    • Component/s: proton-c, proton-j
    • Labels:
      None

      Description

      Currently, pn_messender_send will block until the engine reports there are no queued messages being held. The problem arises because the queued message count only reports message data that is being held by the engine due to insufficient credit to send the messages. Messages may also be sitting in the transport's encoded frame buffer waiting to be written to the wire, and messages may also be held by the driver itself. This latter possibly is problematic given the current transport interface because there is no way for an application using the engine (in this case messenger) to know whether data is being held by the driver without introducing an undesirable coupling between the application and the driver implementation.

      1. transport.patch
        4 kB
        Rafael H. Schloming

        Issue Links

          Activity

          Hide
          Rafael H. Schloming added a comment -

          proposed interface changes for the transport

          Show
          Rafael H. Schloming added a comment - proposed interface changes for the transport
          Hide
          michael goulish added a comment -

          Here is some sample code that demonstrates this bug, and possibly clarifies how it can show up.

          Running the receiver and sender in separate windows, I can see that the receiver does not get anything after ten calls (on the sender) to send().

          It is only when the sender finally calls stop() that the receiver gets all ten messages at once.

          ===================================== sender ===========================================

          #include "proton/message.h"
          #include "proton/messenger.h"

          #include <stdio.h>
          #include <stdlib.h>
          #include <string.h>

          int
          main(int argc, char** argv)
          {
          int c;
          opterr = 0;
          char addr [ 1000 ];
          char content [ 1000 ];
          char subject [ 1000 ];

          sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );

          pn_message_t * message;
          pn_messenger_t * messenger;

          message = pn_message();
          messenger = pn_messenger(NULL);

          pn_messenger_start(messenger);

          int n_messages = 10;
          int sent_count;

          /*------------------------------------------
          Put and send a message every 1 second.
          ------------------------------------------*/
          for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count )

          { sleep ( 1 ); sprintf ( subject, "This is message %d.", sent_count + 1 ); pn_message_set_address ( message, addr ); pn_message_set_subject ( message, subject ); pn_data_t *body = pn_message_body(message); sprintf ( content, "Hello, Proton!" ); pn_data_put_string(body, pn_bytes(strlen(content), content)); pn_messenger_put(messenger, message); pn_messenger_send(messenger); fprintf ( stderr, "sent %d messages.\n", sent_count + 1 ); }

          // Countdown to stop, to give me time to see it ....
          fprintf ( stderr, "Calling stop in ...\n" );
          for ( int i = 5; i > 0; – i )

          { fprintf ( stderr, "%d\n", i ); sleep ( 1 ); }

          pn_messenger_stop(messenger);
          pn_messenger_free(messenger);
          pn_message_free(message);

          return 0;
          }

          ====================================== receiver ===============================================

          #include "proton/message.h"
          #include "proton/messenger.h"

          #include <stdio.h>
          #include <stdlib.h>
          #include <ctype.h>

          #define BUFSIZE 1024

          int
          main(int argc, char** argv)
          {
          char addr [ 1000 ];

          sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
          pn_message_t * message;
          pn_messenger_t * messenger;

          message = pn_message();
          messenger = pn_messenger ( NULL );

          pn_messenger_start(messenger);
          pn_messenger_subscribe ( messenger, addr );

          int messages_wanted = 10;
          int total_received = 0;
          int received_this_time;

          pn_messenger_set_timeout ( messenger, 700 );

          int tries = 0;
          while ( total_received < messages_wanted )

          { ++ tries; pn_messenger_recv ( messenger, BUFSIZE ); received_this_time = pn_messenger_incoming ( messenger ); fprintf ( stderr, "try: %d received: %d total: %d\n", tries, received_this_time, total_received ); total_received += received_this_time; }

          int total_consumed = 0;

          for ( ; total_consumed < total_received; ++ total_consumed )

          { pn_messenger_get ( messenger, message ); size_t bufsize = BUFSIZE; char buffer [ bufsize ]; pn_data_t * body = pn_message_body ( message ); pn_data_format ( body, buffer, & bufsize ); printf ( "\n\nMessage %d ----------------------------\n", total_consumed + 1 ); printf ( "Address: %s\n", pn_message_get_address ( message ) ); char const * subject = pn_message_get_subject(message); printf ( "Subject: %s\n", subject ? subject : "(no subject)" ); printf("Content: %s\n", buffer); }

          printf ( "\n\n" );

          pn_messenger_stop(messenger);
          pn_messenger_free(messenger);

          return 0;
          }

          Show
          michael goulish added a comment - Here is some sample code that demonstrates this bug, and possibly clarifies how it can show up. Running the receiver and sender in separate windows, I can see that the receiver does not get anything after ten calls (on the sender) to send(). It is only when the sender finally calls stop() that the receiver gets all ten messages at once. ===================================== sender =========================================== #include "proton/message.h" #include "proton/messenger.h" #include <stdio.h> #include <stdlib.h> #include <string.h> int main(int argc, char** argv) { int c; opterr = 0; char addr [ 1000 ]; char content [ 1000 ]; char subject [ 1000 ]; sprintf ( addr, "amqp://0.0.0.0:%s", argv [1] ); pn_message_t * message; pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger(NULL); pn_messenger_start(messenger); int n_messages = 10; int sent_count; /*------------------------------------------ Put and send a message every 1 second. ------------------------------------------*/ for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count ) { sleep ( 1 ); sprintf ( subject, "This is message %d.", sent_count + 1 ); pn_message_set_address ( message, addr ); pn_message_set_subject ( message, subject ); pn_data_t *body = pn_message_body(message); sprintf ( content, "Hello, Proton!" ); pn_data_put_string(body, pn_bytes(strlen(content), content)); pn_messenger_put(messenger, message); pn_messenger_send(messenger); fprintf ( stderr, "sent %d messages.\n", sent_count + 1 ); } // Countdown to stop, to give me time to see it .... fprintf ( stderr, "Calling stop in ...\n" ); for ( int i = 5; i > 0; – i ) { fprintf ( stderr, "%d\n", i ); sleep ( 1 ); } pn_messenger_stop(messenger); pn_messenger_free(messenger); pn_message_free(message); return 0; } ====================================== receiver =============================================== #include "proton/message.h" #include "proton/messenger.h" #include <stdio.h> #include <stdlib.h> #include <ctype.h> #define BUFSIZE 1024 int main(int argc, char** argv) { char addr [ 1000 ]; sprintf ( addr, "amqp://~0.0.0.0:%s", argv [1] ); pn_message_t * message; pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger ( NULL ); pn_messenger_start(messenger); pn_messenger_subscribe ( messenger, addr ); int messages_wanted = 10; int total_received = 0; int received_this_time; pn_messenger_set_timeout ( messenger, 700 ); int tries = 0; while ( total_received < messages_wanted ) { ++ tries; pn_messenger_recv ( messenger, BUFSIZE ); received_this_time = pn_messenger_incoming ( messenger ); fprintf ( stderr, "try: %d received: %d total: %d\n", tries, received_this_time, total_received ); total_received += received_this_time; } int total_consumed = 0; for ( ; total_consumed < total_received; ++ total_consumed ) { pn_messenger_get ( messenger, message ); size_t bufsize = BUFSIZE; char buffer [ bufsize ]; pn_data_t * body = pn_message_body ( message ); pn_data_format ( body, buffer, & bufsize ); printf ( "\n\nMessage %d ----------------------------\n", total_consumed + 1 ); printf ( "Address: %s\n", pn_message_get_address ( message ) ); char const * subject = pn_message_get_subject(message); printf ( "Subject: %s\n", subject ? subject : "(no subject)" ); printf("Content: %s\n", buffer); } printf ( "\n\n" ); pn_messenger_stop(messenger); pn_messenger_free(messenger); return 0; }
          Hide
          michael goulish added a comment -

          I am able to get my example working the way I want to by using a tracker, with window size 1, on the sender, and calling pn_messenger_status() after every message sent.

          new code:

          ==================================== sender =============================================

          #include "proton/message.h"
          #include "proton/messenger.h"

          #include <stdio.h>
          #include <stdlib.h>
          #include <string.h>

          int
          main(int argc, char** argv)
          {
          int c;
          opterr = 0;
          char addr [ 1000 ];
          char content [ 1000 ];
          char subject [ 1000 ];

          sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );

          pn_message_t * message;
          pn_messenger_t * messenger;

          message = pn_message();
          messenger = pn_messenger(NULL);
          pn_messenger_set_outgoing_window ( messenger, 1 );

          pn_messenger_start(messenger);

          int n_messages = 10;
          int sent_count;

          /*------------------------------------------
          Put and send a message every 1 second.
          ------------------------------------------*/
          for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count )

          { sleep ( 1 ); sprintf ( subject, "This is message %d.", sent_count + 1 ); pn_message_set_address ( message, addr ); pn_message_set_subject ( message, subject ); pn_data_t *body = pn_message_body(message); sprintf ( content, "Hello, Proton!" ); pn_data_put_string(body, pn_bytes(strlen(content), content)); pn_messenger_put(messenger, message); pn_tracker_t tracker; tracker = pn_messenger_outgoing_tracker ( messenger ); pn_messenger_send(messenger); pn_messenger_status ( messenger, tracker ); fprintf ( stderr, "sent %d messages.\n", sent_count + 1 ); }

          // Countdown to stop, to give me time to see it ....
          fprintf ( stderr, "Calling stop in ...\n" );
          for ( int i = 5; i > 0; – i )

          { fprintf ( stderr, "%d\n", i ); sleep ( 1 ); }

          fprintf ( stderr, "stop.\n");

          pn_messenger_stop(messenger);
          pn_messenger_free(messenger);
          pn_message_free(message);

          return 0;
          }

          ===================================== receiver =========================================
          #include "proton/message.h"
          #include "proton/messenger.h"

          #include <stdio.h>
          #include <stdlib.h>
          #include <ctype.h>

          #define BUFSIZE 1024

          void
          consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message )
          {
          for ( int consume_count = 0; consume_count < n; ++ consume_count )

          { pn_messenger_get ( messenger, message ); size_t bufsize = BUFSIZE; char buffer [ bufsize ]; pn_data_t * body = pn_message_body ( message ); pn_data_format ( body, buffer, & bufsize ); printf ( "\n\nMessage ----------------------------\n"); printf ( "Address: %s\n", pn_message_get_address ( message ) ); char const * subject = pn_message_get_subject(message); printf ( "Subject: %s\n", subject ? subject : "(no subject)" ); printf("Content: %s\n\n", buffer); }

          }

          int
          main(int argc, char** argv)
          {
          char addr [ 1000 ];

          sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
          pn_message_t * message;
          pn_messenger_t * messenger;

          message = pn_message();
          messenger = pn_messenger ( NULL );

          pn_messenger_start(messenger);
          pn_messenger_subscribe ( messenger, addr );

          int messages_wanted = 10;
          int total_received = 0;
          int received_this_time;

          pn_messenger_set_timeout ( messenger, 700 );

          int tries = 0;
          while ( total_received < messages_wanted )

          { ++ tries; pn_messenger_recv ( messenger, BUFSIZE ); received_this_time = pn_messenger_incoming ( messenger ); fprintf ( stderr, "try: %d received: %d total: %d\n", tries, received_this_time, total_received ); consume_messages ( messenger, received_this_time, message ); total_received += received_this_time; }

          pn_messenger_stop(messenger);
          pn_messenger_free(messenger);

          return 0;
          }

          Show
          michael goulish added a comment - I am able to get my example working the way I want to by using a tracker, with window size 1, on the sender, and calling pn_messenger_status() after every message sent. new code: ==================================== sender ============================================= #include "proton/message.h" #include "proton/messenger.h" #include <stdio.h> #include <stdlib.h> #include <string.h> int main(int argc, char** argv) { int c; opterr = 0; char addr [ 1000 ]; char content [ 1000 ]; char subject [ 1000 ]; sprintf ( addr, "amqp://0.0.0.0:%s", argv [1] ); pn_message_t * message; pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger(NULL); pn_messenger_set_outgoing_window ( messenger, 1 ); pn_messenger_start(messenger); int n_messages = 10; int sent_count; /*------------------------------------------ Put and send a message every 1 second. ------------------------------------------*/ for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count ) { sleep ( 1 ); sprintf ( subject, "This is message %d.", sent_count + 1 ); pn_message_set_address ( message, addr ); pn_message_set_subject ( message, subject ); pn_data_t *body = pn_message_body(message); sprintf ( content, "Hello, Proton!" ); pn_data_put_string(body, pn_bytes(strlen(content), content)); pn_messenger_put(messenger, message); pn_tracker_t tracker; tracker = pn_messenger_outgoing_tracker ( messenger ); pn_messenger_send(messenger); pn_messenger_status ( messenger, tracker ); fprintf ( stderr, "sent %d messages.\n", sent_count + 1 ); } // Countdown to stop, to give me time to see it .... fprintf ( stderr, "Calling stop in ...\n" ); for ( int i = 5; i > 0; – i ) { fprintf ( stderr, "%d\n", i ); sleep ( 1 ); } fprintf ( stderr, "stop.\n"); pn_messenger_stop(messenger); pn_messenger_free(messenger); pn_message_free(message); return 0; } ===================================== receiver ========================================= #include "proton/message.h" #include "proton/messenger.h" #include <stdio.h> #include <stdlib.h> #include <ctype.h> #define BUFSIZE 1024 void consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message ) { for ( int consume_count = 0; consume_count < n; ++ consume_count ) { pn_messenger_get ( messenger, message ); size_t bufsize = BUFSIZE; char buffer [ bufsize ]; pn_data_t * body = pn_message_body ( message ); pn_data_format ( body, buffer, & bufsize ); printf ( "\n\nMessage ----------------------------\n"); printf ( "Address: %s\n", pn_message_get_address ( message ) ); char const * subject = pn_message_get_subject(message); printf ( "Subject: %s\n", subject ? subject : "(no subject)" ); printf("Content: %s\n\n", buffer); } } int main(int argc, char** argv) { char addr [ 1000 ]; sprintf ( addr, "amqp://~0.0.0.0:%s", argv [1] ); pn_message_t * message; pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger ( NULL ); pn_messenger_start(messenger); pn_messenger_subscribe ( messenger, addr ); int messages_wanted = 10; int total_received = 0; int received_this_time; pn_messenger_set_timeout ( messenger, 700 ); int tries = 0; while ( total_received < messages_wanted ) { ++ tries; pn_messenger_recv ( messenger, BUFSIZE ); received_this_time = pn_messenger_incoming ( messenger ); fprintf ( stderr, "try: %d received: %d total: %d\n", tries, received_this_time, total_received ); consume_messages ( messenger, received_this_time, message ); total_received += received_this_time; } pn_messenger_stop(messenger); pn_messenger_free(messenger); return 0; }
          Hide
          Rafael H. Schloming added a comment -

          You shouldn't need to bother with the tracker or status. I don't believe those calls will interact with the network at all. I think setting the outgoing window to 1 should be all that is necessary as that will cause send to block until the remote disposition has been seen.

          Show
          Rafael H. Schloming added a comment - You shouldn't need to bother with the tracker or status. I don't believe those calls will interact with the network at all. I think setting the outgoing window to 1 should be all that is necessary as that will cause send to block until the remote disposition has been seen.
          Hide
          michael goulish added a comment -

          Yes, your'e right!
          It works by just adding
          pn_messenger_set_outgoing_window ( messenger, 1 );
          and nothing else.

          Show
          michael goulish added a comment - Yes, your'e right! It works by just adding pn_messenger_set_outgoing_window ( messenger, 1 ); and nothing else.
          Hide
          Ken Giusti added a comment -

          Review Board post with proposed fix:

          https://reviews.apache.org/r/9450/

          Show
          Ken Giusti added a comment - Review Board post with proposed fix: https://reviews.apache.org/r/9450/
          Show
          Ken Giusti added a comment - http://svn.apache.org/viewvc?view=revision&revision=1446697

            People

            • Assignee:
              Ken Giusti
              Reporter:
              Rafael H. Schloming
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development