Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-8347

Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: qpid-cpp-1.39.0
    • Fix Version/s: None
    • Component/s: C++ Client
    • Labels:
      None
    • Environment:

      OS: Ubuntu Bionic

      QPID C++ version 1.39.0

      QPID Proton version 0.28.0
      RabbitMQ Broker version 3.6.10

      Erlang version 20.2.2

      RabbimtMQ AMQP 1.0 plugin Version 3.6.10

       

      Description

      Hello All,

      I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call. I believe that this is a bug in the QPID C++ API because it should always respect the client provided timeout, no mater what the Broker does. I also wrote a bug for RabbitMQ AMQP 1.0 plugin: https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90

       

      Update: After further testing, it seems like Qpid Proton handles the timeout case correctly with RabbitMQ Broker, isolating this issue to the C++ API.

      helloworld_blocking.py:

      from _future_ import print_function

      from proton import Message

      from proton.utils import BlockingConnection

      from proton.handlers import IncomingMessageHandler

       

      conn = BlockingConnection("localhost:5672")

      receiver = conn.create_receiver("examples")

      sender = conn.create_sender("examples")

      #sender.send(Message(body="Hello World!"));

      msg = receiver.receive(timeout=0)

      print(msg.body)

      receiver.accept()

      conn.close()

       

      Result:

       python helloworld_blocking.py 

      Traceback (most recent call last):

        File "helloworld_blocking.py", line 30, in <module>

          msg = receiver.receive(timeout=0)

        File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", line 171, in receive

          timeout=timeout)

        File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", line 314, in wait

          raise Timeout(txt)

      proton._exceptions.Timeout: Connection amqp://localhost:5672 timed out: Receiving on receiver 5aa78b24-27c6-4b76-a92a-d5410aa0a6ef-examples

       

      QPID C++ Example Code (Modified HelloWorld example):

      #include <qpid/messaging/Connection.h>

      #include <qpid/messaging/Message.h>

      #include <qpid/messaging/Receiver.h>

      #include <qpid/messaging/Sender.h>

      #include <qpid/messaging/Session.h>

      #include <iostream>

       

      using namespace qpid::messaging;

       

      int main(int argc, char** argv) {

          std::string broker = argc > 1 ? argv[1] : "localhost:5672";

          std::cout << "broker: " << broker << std::endl;

          std::string address = argc > 2 ? argv[2] : "topic.hello.world";

          std::cout << "address: " << address << std::endl;

          std::string connectionOptions = argc > 3 ? argv[3] : "";

          std::cout << "connectionOptions: " << connectionOptions << std::endl;

       

          try

      {                 Connection connection(broker, connectionOptions);         connection.open();         Session session = connection.createSession();          Receiver receiver = session.createReceiver(address);         Message message;         std::cout << "Pre Receive" << std::endl;         message = receiver.fetch(Duration::SECOND * 10);         std::cout << "Post Receive" << std::endl;         session.acknowledge();           connection.close();         return 0;     }

      catch(const std::exception& error)

      {         std::cerr << error.what() << std::endl;         return 1;     }

      }

      }

       

      Server Error (Occurs after 10 second timeout):

      =INFO REPORT==== 23-Jul-2019::17:49:54 ===

      accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)

       

      =ERROR REPORT==== 23-Jul-2019::17:49:54 ===

      closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):

      {bad_version,{1,1,0,10}}

       

      =INFO REPORT==== 23-Jul-2019::17:49:54 ===

      accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)

       

      =ERROR REPORT==== 23-Jul-2019::17:50:04 ===

        • Generic server <0.675.0> terminating 
        • Last message in was {send_command,

                                 {'basic.credit_drained',

                                     <<99,116,97,103,45,0,0,0,0>>,

                                     1}}

        • When Server state == {state,1,<0.669.0>,<0.674.0>,direct,

                                    

      {[],[]},

                                     false,<0.678.0>,none,none,0,true,none,

                                     {0,nil},

                                     {0,nil},

                                     true,false}

      ** Reason for termination == 

      ** {{badmatch,{empty,{[],[]}

      }},

          [

      {amqp_channel,rpc_bottom_half,2,                    [

      {file,"src/amqp_channel.erl"},{line,623}]},

           {amqp_channel,handle_method_from_server1,3,                    [{file,"src/amqp_channel.erl"}

      ,

      {line,800}

      ]},

           {gen_server,try_dispatch,4,

      {line,616}

      ]},

           {gen_server,handle_msg,6,[

      {file,"gen_server.erl"},//{line,686}" class="external-link" rel="nofollow">\\\\\\\{file,"gen_server.erl"}

      ,{line,616}]},

          

      {gen_server,handle_msg,6,[
      {file,"gen_server.erl"}

      ,\\\\\\\{line,686}},

          

      {proc_lib,init_p_do_apply,3,[\\\\ \{file,"proc_lib.erl"}

      ,\\\\\\\{line,247}|file://{file,/]}]}

       

      =WARNING REPORT==== 23-Jul-2019::17:50:04 ===

      Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch,

                                                                               {empty,                                                                          

      {[],                                                                            []}

      }},

                                                                              [{amqp_channel,                                                                           rpc_bottom_half,                                                                           2,                                                                           [

      {file,                                                                             "src/amqp_channel.erl"},

                                                                                 {line,                                                                             623}]},

                                                                               {amqp_channel,                                                                           handle_method_from_server1,                                                                           3,                                                                           [{file,                                                                             "src/amqp_channel.erl"}

      ,

                                                                                 {line,                                                                             800}]},

                                                                               {gen_server,                                                                           try_dispatch,                                                                           4,                                                                           [

      {file,                                                                             "gen_server.erl"},

                                                                                 {line,                                                                             616}]},

                                                                               {gen_server,                                                                           handle_msg,                                                                           6,                                                                           [{file,                                                                             "gen_server.erl"}

      ,

                                                                                 {line,                                                                             686}]},

                                                                               {proc_lib,                                                                           init_p_do_apply,                                                                           3,                                                                           [

      {file,                                                                             "proc_lib.erl"}

      ,

                                                                                 {line,                                                                             247}]}]}

       

      =ERROR REPORT==== 23-Jul-2019::17:50:04 ===

        • Generic server <0.678.0> terminating
        • Last message in was {'EXIT',<0.675.0>,

                                 {{badmatch,{empty,

      {[],[]}}},}}

      {{                             [{amqp_channel,rpc_bottom_half,2,                                  [ {file,"src/amqp_channel.erl"},\\\\\\\\{line,623}]},}}

      {{                              {amqp_channel,handle_method_from_server1,3,                                  [{file,"src/amqp_channel.erl"},{line,800}

      ]},}}

      {{                              {gen_server,try_dispatch,4,                                  [ \{file,"gen_server.erl"}

      , {line,616}]},}}

      {{                              {gen_server,handle_msg,6,                                  [ \{file,"gen_server.erl"}

      ,
      {line,686}

      ]},}}

      {{                              {proc_lib,init_p_do_apply,3,                                  [ \{file,"proc_lib.erl"}

      ,
      {line,247}

      ]}]}}}}

      {{ ** When Server state ==

      {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,                          <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,                          {lstate,<0.677.0>,false}

      ,}}

      {{                          none,1,}}

      {{                         
      {[],[]},}}

      {{                          {user,<<"guest">>,                           [administrator],                           [\\\{rabbit_auth_backend_internal,none}|file://{rabbit_auth_backend_internal,none}/]},}}

      {{                          <<"/">>,<<>>,}}

      {{                          {dict,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}

      {{                           [],[],[],[],[],[],[],[],[],

      {{                             [[<0.583.0>|}}

      {{                               {resource,<<"/">>,queue,                                <<"topic.hello.world">>}]],}}

      {{                             [],[],[],[],[],[]}}},}}

      {{                          {state,}}

      {{                           {dict,1,16,16,8,80,48,                            {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}

      {{                            [],[],[],[],[],[],[],[],[],

      {{                              [<0.583.0>,}}

      {{                              [],[],[],[],[],[]}}},}}

      {{                           erlang},}}

      {{                          {dict,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}

      {{                           [],[],[],[],[],[],[],[],[],[],[],

      {{                             [[<<99,116,97,103,45,0,0,0,0>>|}}

      {{                               amqqueue,

      {{                                 {resource,<<"/">>,queue,                                  <<"topic.hello.world">>},}}

      {{                                 false,false,none,[],<0.583.0>,[],[],[],}}

      {{                                 undefined,[],[],live,0},}}

      {{                                {false,65535,false,                                 [{<<"x-credit">>,table,                                   [{<<"credit">>,long,0},}}

      {{                                    {<<"drain">>,boolean,false}]}]}}]],}}

      {{                             [],[],[],[]}}},}}

      {{                          {dict,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}

      {{                           [],[],[],[],[],[],[],[],[],

      {{                             [[<0.583.0>|}}

      {{                               {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}}

      {{                             [],[],[],[],[],[]}}},}}

      {{                          {set,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}

      {{                           [],[],[],[],[],[],[],[],[],

      {{                             [<0.583.0>],}}

      {{                             [],[],[],[],[],[]}}},}}

      {{                          <0.672.0>,}}

      {{                          {state,fine,5000,                           #Ref<0.4039704202.3895984129.147347>},}}

      {{                          false,1,}}

      {{                          }}0,nil},{0,nil,

      {{                          [],}}

      {{                          0,nil},{0,nil,}}

      {{                          [{<<"publisher_confirms">>,bool,true},}}

      {{                           {<<"exchange_exchange_bindings">>,bool,true},}}

      {{                           {<<"basic.nack">>,bool,true},}}

      {{                           {<<"consumer_cancel_notify">>,bool,true},}}

      {{                           {<<"connection.blocked">>,bool,true},}}

      {{                           {<<"authentication_failure_close">>,bool,true}],}}

      {{                          none,65535,none,flow,[]}}}

      {{ ** Reason for termination == }}

      {{ ** {{badmatch,{empty,{[],[]}

      }},

          [{amqp_channel,rpc_bottom_half,2,                    [

      {file,"src/amqp_channel.erl"}

      ,{line,623}]},

          

      {amqp_channel,handle_method_from_server1,3,                    [ {file,"src/amqp_channel.erl"}

      ,{line,800}]},

          

      {gen_server,try_dispatch,4,[

      {file,"gen_server.erl"}

      ,{line,616}]},

          

      {gen_server,handle_msg,6,[ {file,"gen_server.erl"}

      ,{line,686}]},

          

      {proc_lib,init_p_do_apply,3,[

      {file,"proc_lib.erl"}

      ,{line,247}]}]}

       

       

      gdb backtrace:

      (gdb) r

      Starting program: /mnt/user/dev/rabbit_mq_test/hello_world 

      [Thread debugging using libthread_db enabled]

      Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

      broker: localhost:5672

      address: topic.hello.world

      connectionOptions: 

      [New Thread 0x7ffff3d53700 (LWP 26650)]

      [New Thread 0x7ffff3340700 (LWP 26651)]

      [New Thread 0x7ffff2b3f700 (LWP 26652)]

      Pre Receive

      ^C

      Thread 1 "hello_world" received signal SIGINT, Interrupt.

      0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88

      88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.

      (gdb) bt

      #0  0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88

      #1  __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502

      #2  __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655

      #3  0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59

      #4  0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41

      #5  0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706

      #6  0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721

      #7  0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)

          at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271

      #8  0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55

      #9  0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61

      #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52

      #11 0x000055555555589c in main ()

      (gdb) 

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              szegel Seth Zegelstein
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: