Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
proton-c-0.22.0
-
None
-
None
Description
I have a running broker, and a configured queue containing messages.
I also have a consumer, where I configured the max number of attempts to 0 (infinite retry).
I kill the broker (ctrl-c) and start it on the same port.
Upon reconnection, I get the following error message randomly:
receive_with_retry on_connection_open
receive_with_retry: on_error: amqp:not-found: Virtual host 'localhost' is not active
main: end
In the case where the consumer is able to connect, the consumer continues to consume the messages normally.
However, in the broker web interface, I see upon each re-connection, an extra connection (same ip and port) to the queue. As if, the old connection is not killed. I wasn't expecting this behavior. This might be a separate issue.
I was able to reproduce with the following code, on windows 7 (msvc 12 2013)
class receive_with_retry : public proton::messaging_handler { private: std::string url; std::string queueName; public: receive_with_retry(const std::string &u, const std::string& q) : url(u), queueName(q) {} void on_container_start(proton::container &c) override { std::cout << "receive_with_retry on_container_start" << std::endl; c.connect( url, proton::connection_options() .idle_timeout(proton::duration(2000)) .reconnect(proton::reconnect_options() .max_attempts(0) .delay(proton::duration(3000)) .delay_multiplier(1) .max_delay(proton::duration(3001)))); } void on_connection_open(proton::connection& c) override { std::cout << "receive_with_retry on_connection_open " << std::endl; c.open_receiver(queueName, proton::receiver_options().auto_accept(false)); } void on_session_open(proton::session& session) override { std::cout << "receive_with_retry on_session_open " << std::endl; } void on_receiver_open(proton::receiver& receiver) override { std::cout << "receive_with_retry on_receiver_open " << std::endl; receiver.open(); } void on_message(proton::delivery& delivery, proton::message &message) override { std::cout << "receive_with_retry on_message " << message.body() << std::endl; // Can be used for throttling // std::this_thread::sleep_for(std::chrono::milliseconds(100)); // commented out in order not to exit immediately, but continue on consuming the messages. // delivery.receiver().close(); // delivery.receiver().connection().close(); } void on_transport_error(proton::transport& error) override { std::cout << "receive_with_retry: on_transport_error: " << error.error().what() << std::endl; error.connection().close(); } void on_error(const proton::error_condition& error) override { std::cout << "receive_with_retry: on_error: " << error.what() << std::endl; } }; void receiveWithRetry(const std::string& url, const std::string& queueName){ try { std::cout << "main: start" << std::endl; receive_with_retry receiveWithRetry(url, queueName); proton::container(receiveWithRetry).run(); std::cout << "main: end" << std::endl; } catch (const std::exception& cause) { std::cout << "main: caught exception: " << cause.what() << std::endl; } } int main() { try { receiveWithRetry("amqp://localhost:5673", "test_queue"); return 0; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; } return 1; }