Details
Description
With prefetch_size=0, the consumer.receive() sends a pull request to the broker, and then waits on dequeue() for a new message.
With failover, if the broker is restarted after having received the pull request, then the consumer is blocked indefinitely in receive(), and the broker never sends messages to it.
Here is the stack of the blocked consumer. It never returned from the first call to receive().
#0 pthread_cond_wait@@GLIBC_2.3.2 ()
at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:162
#1 0x00007f2528c8fdd7 in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x118a024)
at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
#2 0x00007f2528b23075 in activemq::core::SimplePriorityMessageDispatchChannel::dequeue (this=0x1181b40, timeout=-1)
at activemq/core/SimplePriorityMessageDispatchChannel.cpp:77
#3 0x00007f2528ad6d78 in activemq::core::ActiveMQConsumer::dequeue (this=0x11892d0, timeout=-1) at activemq/core/ActiveMQConsumer.cpp:553
#4 0x00007f2528adbef1 in activemq::core::ActiveMQConsumer::receive (this=0x11892d0) at activemq/core/ActiveMQConsumer.cpp:601
It seems the failover mechanism never notifies the consumer->internal->unconsumedMessages fifo, so the consumer stays blocked on dequeue(), while the new broker doesn't know the consumer has sent a pull request.
I haven't found any use of the callback transportResumed() from the failover transport.
A solution would be to use this callback to notifyAll() on all consumers internal queues.