Description
Working on CI I found a race condition that reveals itself on slower machines and may result in losing messages by consumers.
The race condition is a result of treating the creation of consumer resources and starting them as an atomic operation:
public async Task Init()
{
await Session.Connection.CreateResource(Info);
await Session.Connection.StartResource(Info);
}
On slower boxes pending message may be delivered to the provider before above code returns:
public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) { CheckClosed(); NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal); messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult(); // here may be a race condition consumers.TryAdd(messageConsumer.Info.Id, messageConsumer); return messageConsumer; }
If message arrives before `Init` returns (which means before newly created consumer is added to the list of available consumers), the message will be ignored.
public void OnInboundMessage(InboundMessageDispatch envelope) { if (consumers.TryGetValue(envelope.ConsumerInfo.Id, out NmsMessageConsumer messageConsumer)) { messageConsumer.OnInboundMessage(envelope); } }
Attachments
Issue Links
- links to