Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
4.1.0-incubating
-
None
-
None
-
linux
Description
I found the following error in the client's log :
2017-08-10 09:06:57 ERROR [DubboServerHandler-10.28.109.45:20994-thread-475] c.c.d.a.c.b.r.RocketMQMsgProducer[RocketMQMsgProducer.java:42] -> Send ons msg failed, topic=TopicSubOrderDataSync, tag=TagActivityDataSync, key=activity-order-411320385584760066, msg=411320385584760066
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 17
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:531) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:345) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:327) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:290) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:688) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:458) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1049) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1008) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:204) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
and I look into store.log in RocketMQ Broker, and found that at the same time, the following line is there:
2017-08-10 09:06:57 INFO FlushRealTimeService - Flush data to disk costs 1240 ms
I look into the source code, and found that rocketmq have some index files, which rocketmq will not write immediately(because it is wrote randomlly), but when a index file write finished(about 500MB), rocketmq finally force it into disk, that means dirty pages will be about 500MB maximally, (I have executed the bin/os.sh under RocketMQ, which change the default behavior of linux pdflush).because the [vm.dirty_ratio] is 50, and the available memory is about 1600MB at my linux machine, 500MB will not exceed 50% of 1600M, so pdflush will not executed in this way. So I guess writeback will impact the RT of write.
So I write a testcase and proved this, the code is:
public class MappedFileTest { public static void main(String[] args) throws IOException, InterruptedException { //mock rocketmq's index file String indexFile = "/home/admin/rocketmq-data/index/index"; int indexFileToWriteInMB = 180; FileChannel indexFileChannel = new RandomAccessFile(new File(indexFile), "rw").getChannel(); final MappedByteBuffer indexFileBuffer = indexFileChannel.map(MapMode.READ_WRITE, 0, 1024*1024*500);//500M //put some dirty data, attention that the data size will not overflow vm.dirty_background_ratio; // because we set vm.dirty_expire_centisecs to 3000,so after 30 seconds,pdflush will writeback the dirty data into disk. byte[] bs = new byte[1024*1024*indexFileToWriteInMB];//180m Arrays.fill(bs,(byte)1); indexFileBuffer.put(bs); //mock rocketmq's commitlog file String commitLogFile = "/home/admin/rocketmq-data/commitlog/commitlog"; FileChannel commitLogChannel = new RandomAccessFile(new File(commitLogFile), "rw").getChannel(); final MappedByteBuffer commitLogBuffer = commitLogChannel.map(MapMode.READ_WRITE, 0, 1024*1024*1024);//1G final Object lockObj = new Object(); //mock FlushCommitLogService to writeback dirty data of commitLog into disk. FlushCommitLogService commitLogService = new FlushCommitLogService(lockObj, commitLogBuffer); commitLogService.start(); //mock messageReceived to write data into commitLogFile. mockMessageReceived(lockObj, commitLogBuffer); //wait for about 30 seconds(let linux pdflush to writeback dirty data of indexFile), then you will see some output like(this will block the thread that handle messages, then client will fail to send message,until pdflush is done): //---write cost ms: 213 //flushToDisk cost ms:502 } private static void mockMessageReceived(Object lockObj, MappedByteBuffer commitLogBuffer){ byte[] bs = new byte[1024*10];//10kb Arrays.fill(bs,(byte)1); for(int i=0;i<1000;i++){ commitLogBuffer.position(bs.length * i); long start = System.currentTimeMillis(); commitLogBuffer.put(bs); start = System.currentTimeMillis() - start; if(start > 1) { System.out.println("---write cost ms:" + start); } if(i != 0 && i % 2 == 0) { synchronized (lockObj) { lockObj.notify(); } } try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class FlushCommitLogService extends Thread{ private Object lockObj; private MappedByteBuffer commitLogBuffer; public FlushCommitLogService(Object lockObj,MappedByteBuffer commitLogBuffer) { this.lockObj = lockObj; this.commitLogBuffer = commitLogBuffer; } public void run(){ while(true) { synchronized (lockObj) { try { lockObj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("flushToDisk started"); long start = System.currentTimeMillis(); commitLogBuffer.force(); start = System.currentTimeMillis() - start; System.out.println("flushToDisk cost ms:" + start); } } } }
before running this testcase, please make sure you are running this code in linux platform, and set the following linux configs:
vm.dirty_background_ratio = 50
vm.dirty_ratio = 50
vm.dirty_expire_centisecs = 3000
vm.dirty_writeback_centisecs = 500
The code will do the following things:
1.open a index file, and write some dirty data into it (do not flush manully,let linux pdflush to writeback it into disk)
2.open a commitLog file, and write some dirty data into it, every time the dirty data reach 20KB,it will notify another thread to force data into disk. look into the outputs,it's RT is healthy for now.
3.wait about 30 seconds,until the pdflush wakeup and found dirty data in index file exists for 30 seconds,then pdflush will writeback it into disk(you can execute [cat /proc/meminfo|grep Dirty] to show the size of all dirty data). At the same time,look into the outputs,and you will see some output like this:
---write cost ms: 213
flushToDisk cost ms:502
the RT of write call is unacceptable, this will block the thread that handle messages, then client will fail to send message,until pdflush is done. And if you set the dirty data in index file bigger, the write RT will grow bigger too.
the bin/os.sh set vm.dirty_writeback_centisecs to 360000(an hour), and I look into store.log, found that the log [Flush data to disk costs * ms] appear hourly, this proved my guess much more realistic.
I attention that the index file have three sections, with the first is head, the second is index wrote randomly, and the third is another type of index wrote sequential. so maybe the second section can be move into another file and this will only use 20MB of memory,which will make writeback faster,at the same time,the third section can be wrote sequential and writeback into disk as soon as possible ,just like what commitlog does. but when recover from system crash, consistency of this two index files is also a problem.