Details
Description
I am writing a stress-test that tests multi-thread safetyness of IoSession.write method, and find lose message order.
My test method is as follows
1. The client test code starts 50 threads, sharing the same IoSession object
2. Each test thread simulates a user and sends data in sequence
I believe that the IoFilter I use meets the thread safety conditions
The result I expect is that the server receives the data of each user in an orderly manner, but not
Synchronizing on the session.write makes the problem go away;
Do I really have to synchronize on the session to solve this issue?
ClientDemo.java
public class ClientDemo { public static void main(String[] args) throws Exception { NioSocketConnector connector = new NioSocketConnector(); DefaultIoFilterChainBuilder chain = connector.getFilterChain(); chain.addLast("mdc", new MdcInjectionFilter()); chain.addLast("codec", new ProtocolCodecFilter(new MessagePackCodecFactory())); TcpRPCHandler responseHandler = new TcpRPCHandler(); connector.setHandler(responseHandler); connector.setConnectTimeoutCheckInterval(30); ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1", 9999)); IoSession session = cf.awaitUninterruptibly().getSession(); ExecutorService executor = Executors.newFixedThreadPool(50); for (int i = 0; i < 50; ++i) { executor.execute(new SenderWorker(i, session)); } while (true) { Thread.sleep(5000); System.out.println("client alive......"); // responseHandler.printProgress(); } } } class SenderWorker implements Runnable { private int userId; private IoSession session; public SenderWorker(int userId, IoSession session) { this.userId = userId; this.session = session; } @Override public void run() { for (int i = 0; i < 100; ++i) { MessageData data = new MessageData(userId, i); /*synchronized (session)*/ { session.write(data); } if (i % 5 == 0) { try { Thread.sleep(10); } catch (Exception e) { } } } } }
See the attachment for the complete code, I use maven to manage the project