Uploaded image for project: 'Qpid Proton'
  1. Qpid Proton
  2. PROTON-1917

[proton-c] the c++ proton consumer with retry should continue to retry if virtual host not active

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • proton-c-0.22.0
    • None
    • cpp-binding
    • None

    Description

      I have a running broker, and a configured queue containing messages.

      I also have a consumer, where I configured the max number of attempts to 0 (infinite retry).

      I kill the broker (ctrl-c) and start it on the same port.
      Upon reconnection, I get the following error message randomly:

      receive_with_retry on_connection_open
      receive_with_retry: on_error: amqp:not-found: Virtual host 'localhost' is not active
      main: end

      In the case where the consumer is able to connect, the consumer continues to consume the messages normally.

      However, in the broker web interface, I see upon each re-connection, an extra connection (same ip and port) to the queue. As if, the old connection is not killed. I wasn't expecting this behavior. This might be a separate issue.

      I was able to reproduce with the following code, on windows 7 (msvc 12 2013)

      class receive_with_retry : public proton::messaging_handler {
      private:
         std::string url;
         std::string queueName;
      
      public:
         receive_with_retry(const std::string &u, const std::string& q) : url(u), queueName(q) {}
      
         void on_container_start(proton::container &c) override {
            std::cout << "receive_with_retry on_container_start" << std::endl;
      
            c.connect(
               url,
               proton::connection_options()
                  .idle_timeout(proton::duration(2000))
                  .reconnect(proton::reconnect_options()
                              .max_attempts(0)
                              .delay(proton::duration(3000))
                              .delay_multiplier(1)
                              .max_delay(proton::duration(3001))));
         }
      
         void on_connection_open(proton::connection& c) override {
            std::cout << "receive_with_retry on_connection_open " << std::endl;
            c.open_receiver(queueName, proton::receiver_options().auto_accept(false));
         }
      
         void on_session_open(proton::session& session) override {
            std::cout << "receive_with_retry on_session_open " << std::endl;
         }
      
         void on_receiver_open(proton::receiver& receiver) override {
            std::cout << "receive_with_retry on_receiver_open " << std::endl;
            receiver.open();
         }
      
         void on_message(proton::delivery& delivery, proton::message &message) override {
            std::cout << "receive_with_retry on_message " << message.body() << std::endl;
      
            // Can be used for throttling
            // std::this_thread::sleep_for(std::chrono::milliseconds(100));
      
            // commented out in order not to exit immediately, but continue on consuming the messages.
            // delivery.receiver().close();
            // delivery.receiver().connection().close();
         }
      
         void on_transport_error(proton::transport& error) override {
            std::cout << "receive_with_retry: on_transport_error: " << error.error().what() << std::endl;
      
            error.connection().close();
         }
      
         void on_error(const proton::error_condition& error) override {
            std::cout << "receive_with_retry: on_error: " << error.what() << std::endl;
         }
      };
      
      void receiveWithRetry(const std::string& url, const std::string& queueName){
         try {
            std::cout << "main: start" << std::endl;
            receive_with_retry receiveWithRetry(url, queueName);
            proton::container(receiveWithRetry).run();
            std::cout << "main: end" << std::endl;
         }
      
         catch (const std::exception& cause) {
            std::cout << "main: caught exception: " << cause.what() << std::endl;
         }
      }
      
      int main() {
         try {
            receiveWithRetry("amqp://localhost:5673", "test_queue");
            return 0;
         }
         catch (const std::exception& e) {
            std::cerr << e.what() << std::endl;
         }
         return 1;
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            jeremy.aouad Jeremy
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: