Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 4.1.0-incubating
    • Fix Version/s: 4.1.0-incubating
    • Component/s: rocketmq-client
    • Labels:
      None

      Description

      Tests show that Kafka's million-level TPS is mainly owed to batch. When set batch size to 1, the TPS is reduced an order of magnitude. So I try to add this feature to RocketMQ.

      For a minimal effort, it works as follows:
      Only add synchronous send functions to MQProducer interface, just like send(final Collection msgs).
      Use MessageBatch which extends Message and implements Iterable<Message>.
      Use byte buffer instead of list of objects to avoid too much GC in Broker.
      Split the decode and encode logic from lockForPutMessage to avoid too many race conditions.

      Tests:
      On linux with 24 Core 48G Ram and SSD, using 50 threads to send 50Byte(body) message in batch size 50, we get about 150w TPS until the disk is full.

      Potential problems:
      Although the messages can be accumulated in the Broker very quickly, it need time to dispatch to the consume queue, which is much slower than accepting messages. So the messages may not be able to be consumed immediately.

      We may need to refactor the ReputMessageService to solve this problem.

      And if guys have some ideas, please let me know or just share it in this issue.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dongeforever opened a pull request:

          https://github.com/apache/incubator-rocketmq/pull/53

          ROCKETMQ-80 Add batch feature

          Tests show that Kafka's million-level TPS is mainly owed to batch. When set batch size to 1, the TPS is reduced an order of magnitude. So I try to add this feature to RocketMQ.

          For a minimal effort, it works as follows:

          • Only add synchronous send functions to MQProducer interface, just like *send(final Collection<Message> msgs)*
          • Use *MessageBatch* which extends *Message* and implements *Iterable\<Message\>*
          • Use byte buffer instead of list of objects to avoid too much GC in Broker.
          • Split the decode and encode logic from *lockForPutMessage* to avoid too many race conditions.

          Tests:
          On linux with 24 Core 48G Ram and SSD, single broker, using 50 threads to send 50Byte(body) message in batch size 50, we get about 150w TPS until the disk is full.

          Potential problems:
          Although the messages can be accumulated in the Broker very quickly, it need time to dispatch to the consume queue, which is much slower than accepting messages. So the messages may not be able to be consumed immediately.

          We may need to refactor the *ReputMessageService* to solve this problem.

          And if guys have some ideas, please let me know or just share it in this issue.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dongeforever/incubator-rocketmq ROCKETMQ-80

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/incubator-rocketmq/pull/53.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #53


          commit e03b6e6a496526848df603fd406b77aa6afc87d2
          Author: dongeforever <zhendongliu92@yeah.net>
          Date: 2017-02-07T06:12:16Z

          ROCKETMQ-80 Add batch feature


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dongeforever opened a pull request: https://github.com/apache/incubator-rocketmq/pull/53 ROCKETMQ-80 Add batch feature Tests show that Kafka's million-level TPS is mainly owed to batch. When set batch size to 1, the TPS is reduced an order of magnitude. So I try to add this feature to RocketMQ. For a minimal effort, it works as follows: Only add synchronous send functions to MQProducer interface, just like * send(final Collection<Message> msgs) * Use * MessageBatch * which extends * Message * and implements * Iterable\<Message\> * Use byte buffer instead of list of objects to avoid too much GC in Broker. Split the decode and encode logic from * lockForPutMessage * to avoid too many race conditions. Tests: On linux with 24 Core 48G Ram and SSD, single broker, using 50 threads to send 50Byte(body) message in batch size 50, we get about 150w TPS until the disk is full. Potential problems: Although the messages can be accumulated in the Broker very quickly, it need time to dispatch to the consume queue, which is much slower than accepting messages. So the messages may not be able to be consumed immediately. We may need to refactor the * ReputMessageService * to solve this problem. And if guys have some ideas, please let me know or just share it in this issue. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongeforever/incubator-rocketmq ROCKETMQ-80 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rocketmq/pull/53.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #53 commit e03b6e6a496526848df603fd406b77aa6afc87d2 Author: dongeforever <zhendongliu92@yeah.net> Date: 2017-02-07T06:12:16Z ROCKETMQ-80 Add batch feature
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @vongosling @zhouxinyu @WillemJiang please have a review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @vongosling @zhouxinyu @WillemJiang please have a review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          Cool, Thanks @dongeforever providing this feature. We will have a look at this implementation. please hold your horses

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 Cool, Thanks @dongeforever providing this feature. We will have a look at this implementation. please hold your horses
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10029487/badge)](https://coveralls.io/builds/10029487)

          Coverage increased (+0.4%) to 31.909% when pulling *e03b6e6a496526848df603fd406b77aa6afc87d2 on dongeforever:ROCKETMQ-80* into *9a2de7b555b390c1c55f5a275d6fe7e251ac3f62 on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10029487/badge)](https://coveralls.io/builds/10029487 ) Coverage increased (+0.4%) to 31.909% when pulling * e03b6e6a496526848df603fd406b77aa6afc87d2 on dongeforever: ROCKETMQ-80 * into * 9a2de7b555b390c1c55f5a275d6fe7e251ac3f62 on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10029487/badge)](https://coveralls.io/builds/10029487)

          Coverage increased (+0.4%) to 31.909% when pulling *e03b6e6a496526848df603fd406b77aa6afc87d2 on dongeforever:ROCKETMQ-80* into *9a2de7b555b390c1c55f5a275d6fe7e251ac3f62 on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10029487/badge)](https://coveralls.io/builds/10029487 ) Coverage increased (+0.4%) to 31.909% when pulling * e03b6e6a496526848df603fd406b77aa6afc87d2 on dongeforever: ROCKETMQ-80 * into * 9a2de7b555b390c1c55f5a275d6fe7e251ac3f62 on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhouxinyu commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          I wonder that are there some compatibility issues between new client version and old server version?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhouxinyu commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 I wonder that are there some compatibility issues between new client version and old server version?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @zhouxinyu yeah, this is a problem. When new client send batched messages to old server, it will get no error, for the batched messages are treated as normal message internally.

          Maybe I need a new request code, therefore the old server cannot recognize it and throw error.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @zhouxinyu yeah, this is a problem. When new client send batched messages to old server, it will get no error, for the batched messages are treated as normal message internally. Maybe I need a new request code, therefore the old server cannot recognize it and throw error.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shroman commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          Sounds good to check for the protocol version in the request header, and reject with an error when not matched.
          Probably, `RemotingCommand.REMOTING_VERSION_KEY` will work, but I would create a version policy based not on the release (that is used now) but on the protocol changes, so that we don't have to introduce a new checking condition in the code with each release. Just an idea.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shroman commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 Sounds good to check for the protocol version in the request header, and reject with an error when not matched. Probably, `RemotingCommand.REMOTING_VERSION_KEY` will work, but I would create a version policy based not on the release (that is used now) but on the protocol changes, so that we don't have to introduce a new checking condition in the code with each release. Just an idea.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @shroman thank you.
          Great idea to create a version policy based on protocol changes.
          And we would need to control every api separately.
          So maybe we can write all these checking codes in one place for all apis.

          BTW, version control is a global problem, not only for batch, maybe we need more discussion in the mailing list.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @shroman thank you. Great idea to create a version policy based on protocol changes. And we would need to control every api separately. So maybe we can write all these checking codes in one place for all apis. BTW, version control is a global problem, not only for batch, maybe we need more discussion in the mailing list.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shroman commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @dongeforever yes, I think we have to discuss it in the dev mailing list :+1:
          I think having a class/methods for wire protocol version checking and applying them (and, as a result, accepting or rejecting) when a client wants to establish a connection to a broker, etc. should work. I guess that's what you meant by having "checking codes in one place"
          The version information can be placed into headers.

          Such protocol changes shouldn't happen that often (RocketMQ v.4.0.0, v.4.1.0 may use protocol v.1, but RocketMQ v.5.0.0 may use protocol v.2), but if they happen, in this way we can guarantee that the communication between system components is done correctly.

          Let's discuss it in the ml. My understanding of RocketMQ is still not complete Maybe there are better ideas not to over-complicate is with versioning.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shroman commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @dongeforever yes, I think we have to discuss it in the dev mailing list :+1: I think having a class/methods for wire protocol version checking and applying them (and, as a result, accepting or rejecting) when a client wants to establish a connection to a broker, etc. should work. I guess that's what you meant by having "checking codes in one place" The version information can be placed into headers. Such protocol changes shouldn't happen that often (RocketMQ v.4.0.0, v.4.1.0 may use protocol v.1, but RocketMQ v.5.0.0 may use protocol v.2), but if they happen, in this way we can guarantee that the communication between system components is done correctly. Let's discuss it in the ml. My understanding of RocketMQ is still not complete Maybe there are better ideas not to over-complicate is with versioning.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @zhouxinyu Note that the batch is a new api, I add a new request code named "SEND_BATCH_MESSAGE". As the old broker cannot recognize it, the compatible problem is gone.

          Also I use the ThreadLocal to avoid race condition for MessageExtBatchEncoder, which previously was cached in the blocking queue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @zhouxinyu Note that the batch is a new api, I add a new request code named "SEND_BATCH_MESSAGE". As the old broker cannot recognize it, the compatible problem is gone. Also I use the ThreadLocal to avoid race condition for MessageExtBatchEncoder, which previously was cached in the blocking queue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156)

          Coverage increased (+0.5%) to 32.046% when pulling *6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever:ROCKETMQ-80* into *573b22c37806a21543b90707bcce6022243a62da on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156 ) Coverage increased (+0.5%) to 32.046% when pulling * 6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever: ROCKETMQ-80 * into * 573b22c37806a21543b90707bcce6022243a62da on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156)

          Coverage increased (+0.5%) to 32.046% when pulling *6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever:ROCKETMQ-80* into *573b22c37806a21543b90707bcce6022243a62da on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156 ) Coverage increased (+0.5%) to 32.046% when pulling * 6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever: ROCKETMQ-80 * into * 573b22c37806a21543b90707bcce6022243a62da on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156)

          Coverage increased (+0.5%) to 32.046% when pulling *6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever:ROCKETMQ-80* into *573b22c37806a21543b90707bcce6022243a62da on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156 ) Coverage increased (+0.5%) to 32.046% when pulling * 6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever: ROCKETMQ-80 * into * 573b22c37806a21543b90707bcce6022243a62da on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Jaskey commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @dongeforever

          I have the same wishes for batch send too, but what drives me is that user may propably need a batch id for one batch of message, and these message should be success all to one single queue, which is nessecary when sneding order message. say msgA msgB and msgC should be consumed in order, they should be send to one same queue, but if we use for loop to send this, A may success and B may fail to the same queue since the queue numbers may changes at that exctly time.

          Batch send could solve this problem. But we may also need to generate a uniq batch id for this in client, which will help us to optimze the performance of consumeorderlyservice in the furture. Currently, message in one single queue can only be consumed only if the previous one consumed successfully which actually is too strict. Actully we only need the message in one batch consumed in order, batch id will help us to do this.

          *So in general, I suggest adding batch id when sending batch message in all message property.*

          PS: There looks like two many repeated code, any ways or plans to clean it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Jaskey commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @dongeforever I have the same wishes for batch send too, but what drives me is that user may propably need a batch id for one batch of message, and these message should be success all to one single queue, which is nessecary when sneding order message. say msgA msgB and msgC should be consumed in order, they should be send to one same queue, but if we use for loop to send this, A may success and B may fail to the same queue since the queue numbers may changes at that exctly time. Batch send could solve this problem. But we may also need to generate a uniq batch id for this in client, which will help us to optimze the performance of consumeorderlyservice in the furture. Currently, message in one single queue can only be consumed only if the previous one consumed successfully which actually is too strict. Actully we only need the message in one batch consumed in order, batch id will help us to do this. * So in general, I suggest adding batch id when sending batch message in all message property. * PS: There looks like two many repeated code, any ways or plans to clean it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @Jaskey Now there is no batch id, but the messages in one batch are sent to the same queue, and they can only be sent all successfully or all unsuccessfully.
          You could check the code or test it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @Jaskey Now there is no batch id, but the messages in one batch are sent to the same queue, and they can only be sent all successfully or all unsuccessfully. You could check the code or test it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Jaskey commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @dongeforever I know your implementation.What I suggest is that you add batch id for this so that we can inditify them, which is actully a very minimum effort for you.

          And as I sai, in the future we can optimize consumeOrderlyService with batch id.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Jaskey commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @dongeforever I know your implementation.What I suggest is that you add batch id for this so that we can inditify them, which is actully a very minimum effort for you. And as I sai, in the future we can optimize consumeOrderlyService with batch id.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @Jaskey Sorry. You are right. And it is worth a new PR, maybe we can add batch id along with a new optimized consumeOrderlyService.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @Jaskey Sorry. You are right. And it is worth a new PR, maybe we can add batch id along with a new optimized consumeOrderlyService.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lizhanhui commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          This feature is definitely nice to have and it expands scenarios that RocketMQ fits best. For example, RocketMQ may have close, if not better, performance with Kafka in log collecting usage.

          That said, this is a pretty important feature and it matters so much that we need to get it right at the beginning. We'd better have a design document first, then discuss various impacts it brings about in the mailing list.

          As for this PR, it's generally good, yet, still needs more work: code duplication, message validation logic discrepancy, excessive constraints on usages(No delay messages, messages of a batch must have same topic, etc) and previously mentioned compatibility issue with older brokers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user lizhanhui commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 This feature is definitely nice to have and it expands scenarios that RocketMQ fits best. For example, RocketMQ may have close, if not better, performance with Kafka in log collecting usage. That said, this is a pretty important feature and it matters so much that we need to get it right at the beginning. We'd better have a design document first, then discuss various impacts it brings about in the mailing list. As for this PR, it's generally good, yet, still needs more work: code duplication, message validation logic discrepancy, excessive constraints on usages(No delay messages, messages of a batch must have same topic, etc) and previously mentioned compatibility issue with older brokers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          Yep agreed @lizhanhui . I will make some comment as much detail as possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 Yep agreed @lizhanhui . I will make some comment as much detail as possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          @dongeforever we can continue to polish this PR, IMO. if you have any problem. please let me know. BTW, can you post your performance test result for us

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 @dongeforever we can continue to polish this PR, IMO. if you have any problem. please let me know. BTW, can you post your performance test result for us
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101994094

          — Diff: common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java —
          @@ -0,0 +1,77 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.rocketmq.common.message;
          +
          +import java.util.ArrayList;
          +import java.util.Collection;
          +import java.util.Iterator;
          +import java.util.List;
          +
          +public class MessageBatch extends Message implements Iterable<Message> {
          — End diff –

          May be, we can consider to implement equals in collection senario

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101994094 — Diff: common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java — @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +public class MessageBatch extends Message implements Iterable<Message> { — End diff – May be, we can consider to implement equals in collection senario
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101981097

          — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java —
          @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
          return response;
          }

          + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
          + final RemotingCommand request, //
          + final SendMessageContext sendMessageContext, //
          + final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
          +
          + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
          + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
          +
          +
          + response.setOpaque(request.getOpaque());
          +
          + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
          + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
          +
          + if (log.isDebugEnabled())

          { + log.debug("receive SendMessage request command, " + request); + }

          +
          + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
          + if (this.brokerController.getMessageStore().now() < startTimstamp)

          { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + }

          +
          + response.setCode(-1);
          + super.msgCheck(ctx, requestHeader, response);
          — End diff –

          Why not put msgCheck this precondition method into the first row of the outer method

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101981097 — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java — @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // return response; } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { + log.debug("receive SendMessage request command, " + request); + } + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); — End diff – Why not put msgCheck this precondition method into the first row of the outer method
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101991874

          — Diff: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java —
          @@ -278,14 +280,14 @@ public void createTopic(final String addr, final String defaultTopic, final Topi
          }

          public SendResult sendMessage(//

          • final String addr, // 1
          • final String brokerName, // 2
          • final Message msg, // 3
          • final SendMessageRequestHeader requestHeader, // 4
          • final long timeoutMillis, // 5
          • final CommunicationMode communicationMode, // 6
          • final SendMessageContext context, // 7
          • final DefaultMQProducerImpl producer // 8
            + final String addr, // 1
              • End diff –

          unnecessary format as our code style, http://rocketmq.incubator.apache.org/docs/code-guidelines/

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101991874 — Diff: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java — @@ -278,14 +280,14 @@ public void createTopic(final String addr, final String defaultTopic, final Topi } public SendResult sendMessage(// final String addr, // 1 final String brokerName, // 2 final Message msg, // 3 final SendMessageRequestHeader requestHeader, // 4 final long timeoutMillis, // 5 final CommunicationMode communicationMode, // 6 final SendMessageContext context, // 7 final DefaultMQProducerImpl producer // 8 + final String addr, // 1 End diff – unnecessary format as our code style, http://rocketmq.incubator.apache.org/docs/code-guidelines/
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101992848

          — Diff: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java —
          @@ -595,8 +596,11 @@ private SendResult sendKernelImpl(final Message msg, //

          byte[] prevBody = msg.getBody();
          try {
          -

          • MessageClientIDSetter.setUniqID(msg);
            + if (msg instanceof MessageBatch) {
              • End diff –

          Why not
          if (!(msg instanceof MessageBatch))

          { MessageClientIDSetter.setUniqID(msg); }

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101992848 — Diff: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java — @@ -595,8 +596,11 @@ private SendResult sendKernelImpl(final Message msg, // byte[] prevBody = msg.getBody(); try { - MessageClientIDSetter.setUniqID(msg); + if (msg instanceof MessageBatch) { End diff – Why not if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); }
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101995717

          — Diff: store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java —
          @@ -119,6 +129,7 @@ public String toString() {
          ", storeTimestamp=" + storeTimestamp +
          ", logicsOffset=" + logicsOffset +
          ", pagecacheRT=" + pagecacheRT +
          — End diff –

          Consider apache commons lang toStringBuilder method to reflect fields

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101995717 — Diff: store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java — @@ -119,6 +129,7 @@ public String toString() { ", storeTimestamp=" + storeTimestamp + ", logicsOffset=" + logicsOffset + ", pagecacheRT=" + pagecacheRT + — End diff – Consider apache commons lang toStringBuilder method to reflect fields
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101989915

          — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java —
          @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
          return response;
          }

          + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
          + final RemotingCommand request, //
          + final SendMessageContext sendMessageContext, //
          + final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
          +
          + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
          + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
          +
          +
          + response.setOpaque(request.getOpaque());
          +
          + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
          + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
          +
          + if (log.isDebugEnabled())

          { + log.debug("receive SendMessage request command, " + request); + }

          +
          + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
          + if (this.brokerController.getMessageStore().now() < startTimstamp)

          { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + }

          +
          + response.setCode(-1);
          + super.msgCheck(ctx, requestHeader, response);
          + if (response.getCode() != -1)

          { + return response; + }

          +
          +
          + int queueIdInt = requestHeader.getQueueId();
          + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
          +
          + if (queueIdInt < 0)

          { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + }

          +
          + int sysFlag = requestHeader.getSysFlag();
          + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType())

          { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + }

          +
          + String newTopic = requestHeader.getTopic();
          + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
          + SubscriptionGroupConfig subscriptionGroupConfig =
          + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
          + if (null == subscriptionGroupConfig)

          { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + }

          +
          +
          + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
          + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal())

          { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + }

          + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
          + if (reconsumeTimes >= maxReconsumeTimes) {
          + newTopic = MixAll.getDLQTopic(groupName);
          + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
          + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
          + DLQ_NUMS_PER_GROUP, //
          + PermName.PERM_WRITE, 0
          + );
          + if (null == topicConfig)

          { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + }

          + }
          + }
          + if (newTopic.length() > Byte.MAX_VALUE)

          { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + newTopic.length()); + return response; + }

          +
          +
          + MessageExtBatch messageExtBatch = new MessageExtBatch();
          + messageExtBatch.setTopic(newTopic);
          + messageExtBatch.setBody(request.getBody());
          + messageExtBatch.setQueueId(queueIdInt);
          + messageExtBatch.setSysFlag(sysFlag);
          + messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
          + messageExtBatch.setBornHost(ctx.channel().remoteAddress());
          + messageExtBatch.setStoreHost(this.getStoreHost());
          + messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
          +
          — End diff –

          Many duplicated code

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101989915 — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java — @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // return response; } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { + log.debug("receive SendMessage request command, " + request); + } + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } + } + if (newTopic.length() > Byte.MAX_VALUE) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + newTopic.length()); + return response; + } + + + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(newTopic); + messageExtBatch.setBody(request.getBody()); + messageExtBatch.setQueueId(queueIdInt); + messageExtBatch.setSysFlag(sysFlag); + messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); + messageExtBatch.setBornHost(ctx.channel().remoteAddress()); + messageExtBatch.setStoreHost(this.getStoreHost()); + messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + — End diff – Many duplicated code
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101993057

          — Diff: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java —
          @@ -737,6 +742,10 @@ public MQClientInstance getmQClientFactory() {
          }

          private boolean tryToCompressMessage(final Message msg) {
          + if (msg instanceof MessageBatch) {
          — End diff –

          why not support nowadays

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101993057 — Diff: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java — @@ -737,6 +742,10 @@ public MQClientInstance getmQClientFactory() { } private boolean tryToCompressMessage(final Message msg) { + if (msg instanceof MessageBatch) { — End diff – why not support nowadays
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101980234

          — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java —
          @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
          return response;
          }

          + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
          + final RemotingCommand request, //
          + final SendMessageContext sendMessageContext, //
          + final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
          +
          + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
          + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
          +
          +
          + response.setOpaque(request.getOpaque());
          +
          + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
          + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
          +
          + if (log.isDebugEnabled()) {
          — End diff –

          IMO, we can remove redundant expression isDebugEnabled here. Also please capitalize the first letter in log output

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101980234 — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java — @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // return response; } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { — End diff – IMO, we can remove redundant expression isDebugEnabled here. Also please capitalize the first letter in log output
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101994977

          — Diff: common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java —
          @@ -48,6 +48,8 @@
          private Integer reconsumeTimes;
          @CFNullable
          private boolean unitMode = false;
          + @CFNullable
          — End diff –

          Check this annotation works well

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101994977 — Diff: common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java — @@ -48,6 +48,8 @@ private Integer reconsumeTimes; @CFNullable private boolean unitMode = false; + @CFNullable — End diff – Check this annotation works well
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101965587

          — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java —
          @@ -72,7 +73,13 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand

          mqtraceContext = buildMsgContext(ctx, requestHeader);
          this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

          • final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            +
            + RemotingCommand response = null;
              • End diff –

          Please remove redundant initializer

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101965587 — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java — @@ -72,7 +73,13 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + + RemotingCommand response = null; End diff – Please remove redundant initializer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101995981

          — Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java —
          @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
          public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
          long logicOffset) {
          final int maxRetries = 30;

          • boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
            + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
              • End diff –

          what is isCQWriteable, why not change the name

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101995981 — Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java — @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() { public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, long logicOffset) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); End diff – what is isCQWriteable, why not change the name
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101990343

          — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java —
          @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
          return response;
          }

          + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
          + final RemotingCommand request, //
          + final SendMessageContext sendMessageContext, //
          + final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
          +
          + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
          + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
          +
          +
          + response.setOpaque(request.getOpaque());
          +
          + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
          + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
          +
          + if (log.isDebugEnabled())

          { + log.debug("receive SendMessage request command, " + request); + }

          +
          + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
          + if (this.brokerController.getMessageStore().now() < startTimstamp)

          { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + }

          +
          + response.setCode(-1);
          + super.msgCheck(ctx, requestHeader, response);
          + if (response.getCode() != -1)

          { + return response; + }

          +
          +
          + int queueIdInt = requestHeader.getQueueId();
          + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
          +
          + if (queueIdInt < 0)

          { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + }

          +
          + int sysFlag = requestHeader.getSysFlag();
          + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType())

          { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + }

          +
          + String newTopic = requestHeader.getTopic();
          + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
          + SubscriptionGroupConfig subscriptionGroupConfig =
          + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
          + if (null == subscriptionGroupConfig)

          { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + }

          +
          +
          + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
          + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal())

          { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + }

          + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
          + if (reconsumeTimes >= maxReconsumeTimes) {
          + newTopic = MixAll.getDLQTopic(groupName);
          + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
          + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
          + DLQ_NUMS_PER_GROUP, //
          + PermName.PERM_WRITE, 0
          + );
          + if (null == topicConfig)

          { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + }

          + }
          + }
          + if (newTopic.length() > Byte.MAX_VALUE)

          { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + newTopic.length()); + return response; + }

          +
          +
          + MessageExtBatch messageExtBatch = new MessageExtBatch();
          + messageExtBatch.setTopic(newTopic);
          — End diff –

          Drawback 1 : only support the same topic

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101990343 — Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java — @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // return response; } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { + log.debug("receive SendMessage request command, " + request); + } + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } + } + if (newTopic.length() > Byte.MAX_VALUE) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + newTopic.length()); + return response; + } + + + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(newTopic); — End diff – Drawback 1 : only support the same topic
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhouxinyu commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          Hi, @shroman @Jaskey @lizhanhui , what's your opinion about this updated PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhouxinyu commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 Hi, @shroman @Jaskey @lizhanhui , what's your opinion about this updated PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083)

          Coverage increased (+0.3%) to 31.791% when pulling *eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever:ROCKETMQ-80* into *573b22c37806a21543b90707bcce6022243a62da on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083 ) Coverage increased (+0.3%) to 31.791% when pulling * eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever: ROCKETMQ-80 * into * 573b22c37806a21543b90707bcce6022243a62da on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083)

          Coverage increased (+0.3%) to 31.791% when pulling *eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever:ROCKETMQ-80* into *573b22c37806a21543b90707bcce6022243a62da on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083 ) Coverage increased (+0.3%) to 31.791% when pulling * eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever: ROCKETMQ-80 * into * 573b22c37806a21543b90707bcce6022243a62da on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083)

          Coverage increased (+0.3%) to 31.791% when pulling *eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever:ROCKETMQ-80* into *573b22c37806a21543b90707bcce6022243a62da on apache:master*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083 ) Coverage increased (+0.3%) to 31.791% when pulling * eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever: ROCKETMQ-80 * into * 573b22c37806a21543b90707bcce6022243a62da on apache:master *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Jaskey commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r103854335

          — Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java —
          @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
          public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
          long logicOffset) {
          final int maxRetries = 30;

          • boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
            + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
              • End diff –

          isConsumeQWritable will be better I guess

          Show
          githubbot ASF GitHub Bot added a comment - Github user Jaskey commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r103854335 — Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java — @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() { public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, long logicOffset) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); End diff – isConsumeQWritable will be better I guess
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on a diff in the pull request:

          https://github.com/apache/incubator-rocketmq/pull/53#discussion_r104348323

          — Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java —
          @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
          public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
          long logicOffset) {
          final int maxRetries = 30;

          • boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
            + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
              • End diff –

          @Jaskey Yeah, it is better to use the name isConsumeQWritable

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r104348323 — Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java — @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() { public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, long logicOffset) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); End diff – @Jaskey Yeah, it is better to use the name isConsumeQWritable
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227)

          Coverage increased (+0.9%) to 31.863% when pulling *854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever:ROCKETMQ-80* into *d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227 ) Coverage increased (+0.9%) to 31.863% when pulling * 854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever: ROCKETMQ-80 * into * d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227)

          Coverage increased (+0.9%) to 31.863% when pulling *854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever:ROCKETMQ-80* into *d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227 ) Coverage increased (+0.9%) to 31.863% when pulling * 854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever: ROCKETMQ-80 * into * d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user coveralls commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          [![Coverage Status](https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227)

          Coverage increased (+0.9%) to 31.863% when pulling *854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever:ROCKETMQ-80* into *d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user coveralls commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 [! [Coverage Status] ( https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227 ) Coverage increased (+0.9%) to 31.863% when pulling * 854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever: ROCKETMQ-80 * into * d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop *.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 11653ce24c72189f4e6121c4babe709b33bc0230 in incubator-rocketmq's branch refs/heads/develop from dongeforever
          [ https://git-wip-us.apache.org/repos/asf?p=incubator-rocketmq.git;h=11653ce ]

          ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53

          Show
          jira-bot ASF subversion and git services added a comment - Commit 11653ce24c72189f4e6121c4babe709b33bc0230 in incubator-rocketmq's branch refs/heads/develop from dongeforever [ https://git-wip-us.apache.org/repos/asf?p=incubator-rocketmq.git;h=11653ce ] ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          this PR has been merged into develop branch

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 this PR has been merged into develop branch
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dongeforever closed the pull request at:

          https://github.com/apache/incubator-rocketmq/pull/53

          Show
          githubbot ASF GitHub Bot added a comment - Github user dongeforever closed the pull request at: https://github.com/apache/incubator-rocketmq/pull/53
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user vongosling commented on the issue:

          https://github.com/apache/incubator-rocketmq/pull/53

          LGTM~

          Show
          githubbot ASF GitHub Bot added a comment - Github user vongosling commented on the issue: https://github.com/apache/incubator-rocketmq/pull/53 LGTM~

            People

            • Assignee:
              zander dongeforever
              Reporter:
              zander dongeforever
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development