Uploaded image for project: 'Qpid Proton'
  1. Qpid Proton
  2. PROTON-1923

Closing the connection yields random behavior for a receiver with reconnect options, due to the specified idle timeout value

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: proton-c-0.22.0
    • Fix Version/s: None
    • Component/s: cpp-binding
    • Labels:
      None

      Description

      I have a test case, where I launch a mocked broker, that sends messages when a consumer is connected and a session opened. In on_tracker_settle, the connection is closed.

      A consumer specifies the reconnect options and the idle timeout. Depending on the value of the idle timeout (and not the reconnect options), the mocked broker will hang on the thread join. For small values around 100ms, we will hang. For values around 200ms and above, the test works. Without the reconnect options, a small idle timeout triggers the idle timeout error.

      The code is the following:

      #include <proton/connection.hpp>
      #include <proton/container.hpp>
      #include <proton/delivery.hpp>
      #include <proton/listener.hpp>
      #include <proton/message.hpp>
      #include <proton/messaging_handler.hpp>
      #include <proton/connection_options.hpp>
      #include <proton/reconnect_options.hpp>
      #include <proton/receiver_options.hpp>
      #include <proton/transport.hpp>
      #include <proton/error.hpp>
      #include <proton/error_condition.hpp>
      
      #include <proton/log.h>
      
      #include <future>
      #include <iostream>
      #include <string>
      #include <chrono>
      #include <thread>
      
      class server_sender : public proton::messaging_handler {
      private:
         std::string url;
         bool fail;
         std::promise<void> promise;
         proton::listener listener;
      
      public:
         server_sender(const std::string& s, std::promise<void> p, bool f = false) : url(s), promise(std::move(p)), sent(false), fail(f) {}
      
         void on_container_start(proton::container &c) override {
            std::cout << "server_sender on_container_start" << std::endl;
      
            listener = c.listen(url);
            promise.set_value();
         }
      
         void on_connection_open(proton::connection& c) {
            std::cout << "server_sender on_connection_open" << std::endl;
            c.open();
         }
      
         void on_session_open(proton::session& s) {
            std::cout << "server_sender on_session_open" << std::endl;
            s.open();
         }
      
         void on_sender_open(proton::sender& s) override {
            std::cout << "server_sender on_sender_open" << std::endl;
            s.open();
         }
      
         void on_sendable(proton::sender& s) override {
            if (fail) {
               throw std::runtime_error("Simulating server failure");
            }
      
            if (!sent) {
               std::cout << "server_sender on_sendable" << std::endl;
               s.send(proton::message("text message"));
               sent = true;
            }
         }
      
         void on_tracker_settle(proton::tracker& t) override {
            std::cout << "server_sender on_tracker_settle" << std::endl;
      
            t.sender().close();
            t.connection().close();
            listener.stop();
         }
      
         void on_error(const proton::error_condition& error) override {
            std::cout << "server_sender on_error: " << error.what() << std::endl;
      
            listener.stop();
         }
      
      private:
         bool sent;
      };
      
      class receive_with_retry : public proton::messaging_handler {
      private:
         std::string url;
         std::string queueName;
      
      public:
         receive_with_retry(const std::string &u, const std::string& q) : url(u), queueName(q) {}
      
         void on_container_start(proton::container &c) override {
            std::cout << "receive_with_retry on_container_start" << std::endl;
      
            c.connect(
               url,
               proton::connection_options()
                  .idle_timeout(proton::duration(1))
                  .reconnect(proton::reconnect_options()
                              .max_attempts(0)
                              .delay(proton::duration(1))
                              .delay_multiplier(1)
                              .max_delay(proton::duration(1))));
         }
      
         void on_connection_open(proton::connection& c) override {
            std::cout << "receive_with_retry on_connection_open " << std::endl;
            c.open_receiver(queueName, proton::receiver_options().auto_accept(true));
         }
      
         void on_session_open(proton::session& session) override {
            std::cout << "receive_with_retry on_session_open " << std::endl;
         }
      
         void on_receiver_open(proton::receiver& receiver) override {
            std::cout << "receive_with_retry on_receiver_open " << std::endl;
            receiver.open();
         }
      
         void on_message(proton::delivery& delivery, proton::message &message) override {
            std::cout << "receive_with_retry on_message " << message.body() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
      
             delivery.receiver().close();
             delivery.receiver().connection().close();
             std::cout << "-- receive_with_retry on_message " << message.body() << std::endl;
         }
      
         void on_transport_error(proton::transport& error) override {
            std::cout << "receive_with_retry: on_transport_error: " << error.error().what() << std::endl;
      
            error.connection().close();
         }
      
         void on_error(const proton::error_condition& error) override {
            std::cout << "receive_with_retry: on_error: " << error.what() << std::endl;
         }
      };
      
      void receiverWithRetryTest(const std::string& url, const std::string& queue)
      {
         std::cout << "Server1 start" << std::endl;
         std::promise<void> promise1;
         auto future1 = promise1.get_future();
         server_sender serverSender1(url, std::move(promise1), false);
      
         std::thread serverSender1Thread([&serverSender1]{
            try {
               proton::container(serverSender1).run();
            }
            catch (const std::exception& cause) {
               std::cout << "serverSender1 threw: " << cause.what() << std::endl;
            }
         });
         future1.wait();
         std::cout << "-- Server1 started" << std::endl;
      
         auto receiveTask = std::async(std::launch::async, [=](){
            try {
               receive_with_retry receiveWithRetry(url, queue);
               proton::container(receiveWithRetry).run();
            }
            catch (const std::exception& cause) {
               std::cout << "Receive task threw the following exception: " << cause.what() << std::endl;
            }
         });
         receiveTask.wait();
      
         std::cout << "Server1 join thread" << std::endl;
         if (serverSender1Thread.joinable()) {
            serverSender1Thread.join();
         }
      
      }
      
      int main() {
         try {
            pn_log_enable(true);
            
            twoSenderServersWithRetry("127.0.0.1:5672", "test_queue");
      
            return 0;
         }
         catch (const std::exception& e) {
            std::cerr << e.what() << std::endl;
         }
      
         return 1;
      }
      

      For a bigger idle timeout, where the test works, the logged output is the following:

      Server1 start
      server_sender on_container_start
      -[0000000000477510]:(PN_LISTENER_OPEN)-
      Server1 started
      Server1 join thread
      receive_with_retry on_container_start
      [0000000000477510]:(PN_LISTENER_ACCEPT)
      server_sender on_connection_open
      receive_with_retry on_connection_open
      server_sender on_session_open
      server_sender on_sender_open
      server_sender on_sendable
      receive_with_retry on_session_open
      receive_with_retry on_receiver_open
      receive_with_retry on_message text message
      -- receive_with_retry on_message text message
      server_sender on_tracker_settle
      [0000000000477510]:(PN_LISTENER_CLOSE)
      [0000000000475650]:(PN_PROACTOR_INACTIVE)[000000000048E8A0]:(PN_PROACTOR_INACTIVE)
      
      [000000000048E8A0]:(PN_PROACTOR_INTERRUPT)
      [0000000000475650]:(PN_PROACTOR_INTERRUPT)
      Press any key to continue . . .
      

      For the low idle timeout, the logged output is the following:

      Server1 start
      server_sender on_container_start
      -[0000000000507510]:(PN_LISTENER_OPEN)-
      Server1 started
      Server1 join thread
      receive_with_retry on_container_start
      [0000000000507510]:(PN_LISTENER_ACCEPT)
      server_sender on_connection_open
      receive_with_retry on_connection_open
      server_sender on_session_open
      server_sender on_sender_open
      server_sender on_sendable
      receive_with_retry on_session_open
      receive_with_retry on_receiver_open
      receive_with_retry on_message text message
      -- receive_with_retry on_message text message
      s[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)e
      rver_sender on_tracker_settle
      [0000000000507510]:(PN_LISTENER_CLOSE)
      [0000000000505650]:(PN_PROACTOR_INACTIVE)
      [0000000000505650]:(PN_PROACTOR_INTERRUPT)
      [000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
      [000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
      [000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
      [000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
      [000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
      ^CPress any key to continue . . .

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jeremy.aouad Jeremy
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: