this problem is similar as I described in previous comment.
the sequence caused the problem is :
1) subscriber #subscribe. FIFODeliveryManager#startServingSubscriber.
2) #startServingSubscriber: callback to send subscriber response back to client.
3) delivery manager thread is switched to execute other threads.
4) client received #subscribe response.
5) client #closeSubscription.
6) #closeSubscription would do FIFODeliveryManager#stopServingSubscriber, which removes subscriber from subscriberStates mapping. (the removal isn't executed in delivery manager thread, but in netty worker thread). so actually #stopServingSubscriber do nothing, since subscriberStates is empty now, since DeliveryManagerThread doesn't get the chance to continue execution.
7) delivery manager thread continues execution and put the active subscriber into the subscriberStates map.
8) when next subscribe request came in, it would replace the active subscriber with a new subscriber object and sent SUBSCRIPTION_FORCE_CLOSED event to its channel. since it was multiplexed channel, so the old subscriber event killed the new one, which caused ClientNotSubscribedException.
several places would fix here:
1) subscribe response only sent after the active subscriber is put in subscriberStates map.
2) removal should be executed in DeliveryRequest, running in delivery manager thread.
3) when replace a active subscriber, do checking whether the previous subscriber and the new subscriber are in same channel. if so, we don't sent FORCE_CLOSED event. because this case, might be a retry subscribe request for a previous timeout subscribe request in a multiplexed channel.