Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • rocketmq-client
    • None

    Description

      Client asyncSend is not fully async because of param validation, connection status check and get route info may cause exception,and these exceptions is called in user thread may cause biz stop.

      Attachments

        Activity

          githubbot ASF GitHub Bot added a comment -

          lindzh opened a new pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219

            1. What is the purpose of the change

          Client asyncSend is not fully async because of param validation, connection status check and get route info may cause exception,and these exceptions is called in user thread may cause biz stop.

            1. Brief changelog

          Make client asyncSend fully async.

            1. Verifying this change

          This is a trivial change.

          Follow this checklist to help us incorporate your contribution quickly and easily:

          • [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/ROCKETMQ/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
          • [x] Format the pull request title like `[ROCKETMQ-XXX] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
          • [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
          • [x] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
          • [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass.
          • [x] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - lindzh opened a new pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219 What is the purpose of the change Client asyncSend is not fully async because of param validation, connection status check and get route info may cause exception,and these exceptions is called in user thread may cause biz stop. Brief changelog Make client asyncSend fully async. Verifying this change This is a trivial change. Follow this checklist to help us incorporate your contribution quickly and easily: [x] Make sure there is a [JIRA issue] ( https://issues.apache.org/jira/projects/ROCKETMQ/issues/ ) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. [x] Format the pull request title like ` [ROCKETMQ-XXX] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body. [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. [x] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module] ( https://github.com/apache/rocketmq/tree/master/test ). [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass. [x] If this contribution is large, please file an [Apache Individual Contributor License Agreement] ( http://www.apache.org/licenses/#clas ). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          coveralls commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#issuecomment-357891999

          [![Coverage Status](https://coveralls.io/builds/15066331/badge)](https://coveralls.io/builds/15066331)

          Coverage increased (+0.2%) to 40.175% when pulling *969f20b504bca4eea2522a4c1009b505602d9d9d on lindzh:fix_sendAsync* into *d849e0ad989dc89e5e439d48700824d138d09ae9 on apache:develop*.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - coveralls commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#issuecomment-357891999 [! [Coverage Status] ( https://coveralls.io/builds/15066331/badge)](https://coveralls.io/builds/15066331 ) Coverage increased (+0.2%) to 40.175% when pulling * 969f20b504bca4eea2522a4c1009b505602d9d9d on lindzh:fix_sendAsync * into * d849e0ad989dc89e5e439d48700824d138d09ae9 on apache:develop *. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r161969269

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          ##########
          @@ -842,32 +858,34 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)
          /**

          • KERNEL ASYNC -------------------------------------------------------
            */
          • public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
          • throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }
          • public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
          • this.makeSureStateOK();
          • Validators.checkMessage(msg, this.defaultMQProducer);
            -
          • if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - }

            + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) {
            + this.getCallbackExecutor().submit(new Runnable() {

          Review comment:
          This method will put a really heavy task into PublicExecutor if the client doesn't set CallBack executor.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r161969269 ########## File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ########## @@ -842,32 +858,34 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) /** KERNEL ASYNC ------------------------------------------------------- */ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { Review comment: This method will put a really heavy task into PublicExecutor if the client doesn't set CallBack executor. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r161970219

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          ##########
          @@ -409,17 +410,32 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
          /**

          • DEFAULT ASYNC -------------------------------------------------------
            */
          • public void send(Message msg,
          • SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }
          • public void send(Message msg, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
          • try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - }

            catch (MQBrokerException e) {

          • throw new MQClientException("unknownn exception", e);
            + public void send(final Message msg, final SendCallback sendCallback, final long timeout) {
            + this.getCallbackExecutor().submit(new Runnable() {
            + @Override
            + public void run() {
            + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + }

            catch (Exception e) {
            + handleCallbackException(e, sendCallback);

          Review comment:
          How to handle the `RejectedExecutionException`? While it has the similar effect as `NettyRemotingAbstract.semaphoreAsync` -

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r161970219 ########## File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ########## @@ -409,17 +410,32 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) /** DEFAULT ASYNC ------------------------------------------------------- */ public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { throw new MQClientException("unknownn exception", e); + public void send(final Message msg, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { + @Override + public void run() { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); Review comment: How to handle the `RejectedExecutionException`? While it has the similar effect as `NettyRemotingAbstract.semaphoreAsync` - ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r161971022

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
          ##########
          @@ -53,11 +51,9 @@ SendResult send(final Message msg, final MessageQueue mq) throws MQClientExcepti
          SendResult send(final Message msg, final MessageQueue mq, final long timeout)
          throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

          • void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
          • throws MQClientException, RemotingException, InterruptedException;
            + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback);

          Review comment:
          There is an upgrade issue if the code has catched the thrown checked-exception, like `MQClientException`.

          We should treat this issue carefully ~

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r161971022 ########## File path: client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ########## @@ -53,11 +51,9 @@ SendResult send(final Message msg, final MessageQueue mq) throws MQClientExcepti SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback); Review comment: There is an upgrade issue if the code has catched the thrown checked-exception, like `MQClientException`. We should treat this issue carefully ~ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r161971194

          ##########
          File path: client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
          ##########
          @@ -214,6 +218,54 @@ public void testSetCallbackExecutor() throws MQClientException

          { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); }

          + @Test
          + public void testAsyncSend() throws MQClientException, RemotingException, InterruptedException {
          + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
          + producer = new DefaultMQProducer(producerGroupTemp);
          + producer.setNamesrvAddr("127.0.0.1:9876");
          + producer.start();
          +
          + final AtomicInteger cc = new AtomicInteger(0);
          + final CountDownLatch countDownLatch = new CountDownLatch(6);
          +
          + SendCallback sendCallback = new SendCallback() {
          + @Override
          + public void onSuccess(SendResult sendResult)

          { + + }

          +
          + @Override
          + public void onException(Throwable e)

          { + e.printStackTrace(); + countDownLatch.countDown(); + cc.incrementAndGet(); + }

          + };
          + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
          + @Override
          + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)

          { + return null; + }

          + };
          +
          + Message message = new Message();
          + message.setTopic("test");
          + message.setBody("hello world".getBytes());
          + producer.send(new Message(),sendCallback);
          + producer.send(message,sendCallback,1000);
          + producer.send(message,new MessageQueue(),sendCallback);
          + producer.send(new Message(),new MessageQueue(),sendCallback,1000);
          + producer.send(new Message(),messageQueueSelector,null,sendCallback);
          + producer.send(message,messageQueueSelector,null,sendCallback,1000);
          +
          + countDownLatch.await(1000L, TimeUnit.MILLISECONDS);
          +
          + assertThat(cc.get()).isEqualTo(6);
          +// producer.sendOneway(new Message());
          +// producer.sendOneway(new Message(),new MessageQueue());
          +// producer.sendOneway(new Message(),messageQueueSelector,null);

          Review comment:
          Please remove these unused code lines~

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - zhouxinyu commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r161971194 ########## File path: client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java ########## @@ -214,6 +218,54 @@ public void testSetCallbackExecutor() throws MQClientException { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); } + @Test + public void testAsyncSend() throws MQClientException, RemotingException, InterruptedException { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new DefaultMQProducer(producerGroupTemp); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(6); + + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + + } + + @Override + public void onException(Throwable e) { + e.printStackTrace(); + countDownLatch.countDown(); + cc.incrementAndGet(); + } + }; + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + return null; + } + }; + + Message message = new Message(); + message.setTopic("test"); + message.setBody("hello world".getBytes()); + producer.send(new Message(),sendCallback); + producer.send(message,sendCallback,1000); + producer.send(message,new MessageQueue(),sendCallback); + producer.send(new Message(),new MessageQueue(),sendCallback,1000); + producer.send(new Message(),messageQueueSelector,null,sendCallback); + producer.send(message,messageQueueSelector,null,sendCallback,1000); + + countDownLatch.await(1000L, TimeUnit.MILLISECONDS); + + assertThat(cc.get()).isEqualTo(6); +// producer.sendOneway(new Message()); +// producer.sendOneway(new Message(),new MessageQueue()); +// producer.sendOneway(new Message(),messageQueueSelector,null); Review comment: Please remove these unused code lines~ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r162240686

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          ##########
          @@ -249,7 +250,7 @@ public TransactionCheckListener checkListener() {

          @Override
          public void checkTransactionState(final String addr, final MessageExt msg,

          • final CheckTransactionStateRequestHeader header) {

          Review comment:
          Do you use our intellij's codestyle configuration

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r162240686 ########## File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ########## @@ -249,7 +250,7 @@ public TransactionCheckListener checkListener() { @Override public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Review comment: Do you use our intellij's codestyle configuration ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r162241195

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
          ##########
          @@ -70,12 +66,10 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob
          InterruptedException;

          void send(final Message msg, final MessageQueueSelector selector, final Object arg,

          Review comment:
          How do we keep backward compatibility?

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r162241195 ########## File path: client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ########## @@ -70,12 +66,10 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, Review comment: How do we keep backward compatibility? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r162244242

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          ##########
          @@ -409,17 +410,32 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
          /**

          • DEFAULT ASYNC -------------------------------------------------------
            */
          • public void send(Message msg,
          • SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }
          • public void send(Message msg, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
          • try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - }

            catch (MQBrokerException e) {

          • throw new MQClientException("unknownn exception", e);
            + public void send(final Message msg, final SendCallback sendCallback, final long timeout) {
            + this.getCallbackExecutor().submit(new Runnable() {
            + @Override
            + public void run() {
            + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + }

            catch (Exception e) {
            + handleCallbackException(e, sendCallback);

          Review comment:
          please refer to https://github.com/spring-projects/spring-framework/blob/master/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolExecutorFactoryBean.java

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r162244242 ########## File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ########## @@ -409,17 +410,32 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) /** DEFAULT ASYNC ------------------------------------------------------- */ public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { throw new MQClientException("unknownn exception", e); + public void send(final Message msg, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { + @Override + public void run() { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); Review comment: please refer to https://github.com/spring-projects/spring-framework/blob/master/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolExecutorFactoryBean.java ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#discussion_r162241485

          ##########
          File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          ##########
          @@ -842,32 +858,34 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)
          /**

          • KERNEL ASYNC -------------------------------------------------------
            */
          • public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
          • throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }
          • public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
          • this.makeSureStateOK();
          • Validators.checkMessage(msg, this.defaultMQProducer);
            -
          • if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - }

            + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) {
            + this.getCallbackExecutor().submit(new Runnable() {

          Review comment:
          sure, how do we separate and assure our executor work well?

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - vongosling commented on a change in pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#discussion_r162241485 ########## File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ########## @@ -842,32 +858,34 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) /** KERNEL ASYNC ------------------------------------------------------- */ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { Review comment: sure, how do we separate and assure our executor work well? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          coveralls commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#issuecomment-357891999

          [![Coverage Status](https://coveralls.io/builds/15212963/badge)](https://coveralls.io/builds/15212963)

          Coverage increased (+0.2%) to 40.133% when pulling *564aa267de3d469d50514f875552dbf30d6c5331 on lindzh:fix_sendAsync* into *a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop*.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - coveralls commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#issuecomment-357891999 [! [Coverage Status] ( https://coveralls.io/builds/15212963/badge)](https://coveralls.io/builds/15212963 ) Coverage increased (+0.2%) to 40.133% when pulling * 564aa267de3d469d50514f875552dbf30d6c5331 on lindzh:fix_sendAsync * into * a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop *. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          coveralls commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#issuecomment-357891999

          [![Coverage Status](https://coveralls.io/builds/15214577/badge)](https://coveralls.io/builds/15214577)

          Coverage increased (+0.2%) to 40.158% when pulling *24890228977c5f3702336ab63f89cb06517d3631 on lindzh:fix_sendAsync* into *a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop*.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - coveralls commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#issuecomment-357891999 [! [Coverage Status] ( https://coveralls.io/builds/15214577/badge)](https://coveralls.io/builds/15214577 ) Coverage increased (+0.2%) to 40.158% when pulling * 24890228977c5f3702336ab63f89cb06517d3631 on lindzh:fix_sendAsync * into * a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop *. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          lindzh opened a new pull request #222: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/222

            1. What is the purpose of the change

          Client asyncSend is not fully async because of param validation, connection status check and get route info may cause exception,and these exceptions is called in user thread may cause biz stop.

            1. Brief changelog

          Make client asyncSend fully async.

            1. Verifying this change

          This is a trivial change.

          Follow this checklist to help us incorporate your contribution quickly and easily:

          • [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/ROCKETMQ/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
          • [x] Format the pull request title like `[ROCKETMQ-XXX] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
          • [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
          • [x] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
          • [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass.
          • [x] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - lindzh opened a new pull request #222: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/222 What is the purpose of the change Client asyncSend is not fully async because of param validation, connection status check and get route info may cause exception,and these exceptions is called in user thread may cause biz stop. Brief changelog Make client asyncSend fully async. Verifying this change This is a trivial change. Follow this checklist to help us incorporate your contribution quickly and easily: [x] Make sure there is a [JIRA issue] ( https://issues.apache.org/jira/projects/ROCKETMQ/issues/ ) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. [x] Format the pull request title like ` [ROCKETMQ-XXX] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body. [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. [x] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module] ( https://github.com/apache/rocketmq/tree/master/test ). [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass. [x] If this contribution is large, please file an [Apache Individual Contributor License Agreement] ( http://www.apache.org/licenses/#clas ). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          lindzh closed pull request #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219

          This is a PR merged from a forked repository.
          As GitHub hides the original diff on merge, it is displayed below for
          the sake of provenance:

          As this is a foreign pull request (from a fork), the diff is supplied
          below (as it won't show otherwise due to GitHub magic):

          diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          index 7c1697967..6729669da 100644
          — a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
          @@ -31,6 +31,7 @@
          import java.util.concurrent.LinkedBlockingQueue;
          import java.util.concurrent.ThreadPoolExecutor;
          import java.util.concurrent.TimeUnit;
          +
          import org.apache.rocketmq.common.message.Message;
          import org.apache.rocketmq.common.message.MessageClientIDSetter;
          import org.apache.rocketmq.common.message.MessageExt;
          @@ -74,6 +75,7 @@
          import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
          import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
          import org.apache.rocketmq.common.sysflag.MessageSysFlag;
          +import org.apache.rocketmq.remoting.DoAsyncCallback;
          import org.apache.rocketmq.remoting.RPCHook;
          import org.apache.rocketmq.remoting.common.RemotingHelper;
          import org.apache.rocketmq.remoting.exception.RemotingConnectException;
          @@ -249,7 +251,7 @@ public TransactionCheckListener checkListener() {

          @Override
          public void checkTransactionState(final String addr, final MessageExt msg,

          • final CheckTransactionStateRequestHeader header) {
            + final CheckTransactionStateRequestHeader header) {
            Runnable request = new Runnable() {
            private final String brokerAddr = addr;
            private final MessageExt message = msg;
            @@ -409,17 +411,43 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
            /**
          • DEFAULT ASYNC -------------------------------------------------------
            */
          • public void send(Message msg,
          • SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }
          • public void send(Message msg, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
          • try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - }

            catch (MQBrokerException e) {

          • throw new MQClientException("unknownn exception", e);
            + public void send(final Message msg, final SendCallback sendCallback, final long timeout) {
            + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new DoAsyncCallback() {
            + @Override
            + public long getTimeout() { + return timeout; + }
            +
            + @Override
            + public void onSuccess() throws RemotingException {
            + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); + throw new RemotingException("client send check exception",e); + }
            + }
            +
            + @Override
            + public void onFailed(Throwable e) { + handleCallbackException(e, sendCallback); + }
            + });
            + }
            +
            + private void handleCallbackException(Throwable e, SendCallback sendCallback) {
            + if (sendCallback != null) {
            + if (e instanceof MQBrokerException) { + sendCallback.onException(new MQClientException("unknown exception", e)); + } else { + sendCallback.onException(e); + }
            + } else { + log.warn("asyncSend message callback null real exception is " + e.getMessage(), e); }
            }

            @@ -583,11 +611,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            }

            private SendResult sendKernelImpl(final Message msg,
            - final MessageQueue mq,
            - final CommunicationMode communicationMode,
            - final SendCallback sendCallback,
            - final TopicPublishInfo topicPublishInfo,
            - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            + final MessageQueue mq,
            + final CommunicationMode communicationMode,
            + final SendCallback sendCallback,
            + final TopicPublishInfo topicPublishInfo,
            + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            @@ -842,32 +870,45 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)
            /**
            * KERNEL ASYNC -------------------------------------------------------
            */
            - public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
            - throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }

            - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
            - throws MQClientException, RemotingException, InterruptedException {
            - this.makeSureStateOK();
            - Validators.checkMessage(msg, this.defaultMQProducer);
            + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) {
            + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new DoAsyncCallback() {
            + @Override
            + public long getTimeout() {+ return timeout;+ }
          • if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - }

            + @Override
            + public void onSuccess() throws RemotingException {
            + try {
            + makeSureStateOK();
            + Validators.checkMessage(msg, defaultMQProducer);

          • try { - this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); - }

            catch (MQBrokerException e)

            { - throw new MQClientException("unknown exception", e); - }

            + if (!msg.getTopic().equals(mq.getTopic()))

            { + throw new MQClientException("message's topic not equal mq's topic", null); + }

            + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
            + } catch (Exception e)

            { + handleCallbackException(e, sendCallback); + throw new RemotingException("client send check exception",e); + }
            + }
            +
            + @Override
            + public void onFailed(Throwable e) { + handleCallbackException(e, sendCallback); + }
            + });
            }

            /**
            * KERNEL ONEWAY -------------------------------------------------------
            */
            public void sendOneway(Message msg,
            - MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
            + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
            this.makeSureStateOK();
            Validators.checkMessage(msg, this.defaultMQProducer);

            @@ -923,18 +964,32 @@ private SendResult sendSelectImpl(
            /**
            * SELECT ASYNC -------------------------------------------------------
            */
            - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
            - throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) { send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); }

            - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
            - throws MQClientException, RemotingException, InterruptedException {
            - try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); - }
            + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) {
            + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new DoAsyncCallback() {
            + @Override
            + public long getTimeout() { + return timeout; + }
            +
            + @Override
            + public void onSuccess() throws RemotingException {
            + try { + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) {+ handleCallbackException(e, sendCallback);+ throw new RemotingException("client send check exception",e);+ }

            + }
            +
            + @Override
            + public void onFailed(Throwable e)

            { + handleCallbackException(e, sendCallback); + }

            + });
            }

          /**
          @@ -950,7 +1005,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
          }

          public TransactionSendResult sendMessageInTransaction(final Message msg,

          • final LocalTransactionExecuter tranExecuter, final Object arg)
            + final LocalTransactionExecuter tranExecuter, final Object arg)
            throws MQClientException {
            if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", null);
            @@ -1064,8 +1119,12 @@ public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); }

          + private ExecutorService getCallbackExecutor()

          { + return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor(); + }

          +
          public SendResult send(Message msg,

          • long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }

          diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
          index a2f25dd0f..07ac9f625 100644
          — a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
          +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
          @@ -250,8 +250,8 @@ public SendResult send(Message msg,

          • @throws InterruptedException if the sending thread is interrupted.
            */
            @Override
          • public void send(Message msg,
          • SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException,
            + InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); }

          @@ -266,8 +266,8 @@ public void send(Message msg,

          • @throws InterruptedException if the sending thread is interrupted.
            */
            @Override
          • public void send(Message msg, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException,
            + RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); }

          @@ -333,8 +333,8 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)

          • @throws InterruptedException if the sending thread is interrupted.
            */
            @Override
          • public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
          • throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException,
            + RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback); }

          @@ -350,8 +350,8 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)

          • @throws InterruptedException if the sending thread is interrupted.
            */
            @Override
          • public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
          • throws MQClientException, RemotingException, InterruptedException {
            + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException,
            + RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); }

          diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
          index 14caf6ffa..dee19b27a 100644
          — a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
          +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
          @@ -70,12 +70,10 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob
          InterruptedException;

          void send(final Message msg, final MessageQueueSelector selector, final Object arg,

          • final SendCallback sendCallback) throws MQClientException, RemotingException,
          • InterruptedException;
            + final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;

          void send(final Message msg, final MessageQueueSelector selector, final Object arg,

          • final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
          • InterruptedException;
            + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;

          void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
          throws MQClientException, RemotingException, InterruptedException;
          diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
          index ded22ada9..a3d11a9de 100644
          — a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
          +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
          @@ -24,6 +24,9 @@
          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          +import java.util.concurrent.TimeUnit;
          +import java.util.concurrent.atomic.AtomicInteger;
          +
          import org.apache.rocketmq.client.ClientConfig;
          import org.apache.rocketmq.client.exception.MQBrokerException;
          import org.apache.rocketmq.client.exception.MQClientException;
          @@ -36,6 +39,7 @@
          import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
          import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
          import org.apache.rocketmq.common.message.Message;
          +import org.apache.rocketmq.common.message.MessageQueue;
          import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
          import org.apache.rocketmq.common.protocol.route.BrokerData;
          import org.apache.rocketmq.common.protocol.route.QueueData;
          @@ -214,6 +218,51 @@ public void testSetCallbackExecutor() throws MQClientException

          { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); }

          + @Test
          + public void testAsyncSend() throws MQClientException, RemotingException, InterruptedException {
          + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
          + producer = new DefaultMQProducer(producerGroupTemp);
          + producer.setNamesrvAddr("127.0.0.1:9876");
          + producer.start();
          +
          + final AtomicInteger cc = new AtomicInteger(0);
          + final CountDownLatch countDownLatch = new CountDownLatch(6);
          +
          + SendCallback sendCallback = new SendCallback() {
          + @Override
          + public void onSuccess(SendResult sendResult)

          { + + }

          +
          + @Override
          + public void onException(Throwable e)

          { + e.printStackTrace(); + countDownLatch.countDown(); + cc.incrementAndGet(); + }

          + };
          + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
          + @Override
          + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)

          { + return null; + }

          + };
          +
          + Message message = new Message();
          + message.setTopic("test");
          + message.setBody("hello world".getBytes());
          + producer.send(new Message(),sendCallback);
          + producer.send(message,sendCallback,1000);
          + producer.send(message,new MessageQueue(),sendCallback);
          + producer.send(new Message(),new MessageQueue(),sendCallback,1000);
          + producer.send(new Message(),messageQueueSelector,null,sendCallback);
          + producer.send(message,messageQueueSelector,null,sendCallback,1000);
          +
          + countDownLatch.await(1000L, TimeUnit.MILLISECONDS);
          +
          + assertThat(cc.get()).isEqualTo(6);
          + }
          +
          public static TopicRouteData createTopicRoute() {
          TopicRouteData topicRouteData = new TopicRouteData();

          diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java
          new file mode 100644
          index 000000000..6de9e9dbb
          — /dev/null
          +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java
          @@ -0,0 +1,30 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.rocketmq.remoting;
          +
          +import org.apache.rocketmq.remoting.exception.RemotingException;
          +
          +public interface DoAsyncCallback

          { + + long getTimeout(); + + void onSuccess() throws RemotingException; + + void onFailed(Throwable e); + +}

          diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
          index 2aea14cb9..e0763b434 100644
          — a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
          +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
          @@ -39,6 +39,8 @@ void invokeAsync(final String addr, final RemotingCommand request, final long ti
          final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
          RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

          + void doAsyncSend(DoAsyncCallback callback);
          +
          void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
          throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
          RemotingTimeoutException, RemotingSendRequestException;
          @@ -48,5 +50,7 @@ void registerProcessor(final int requestCode, final NettyRequestProcessor proces

          void setCallbackExecutor(final ExecutorService callbackExecutor);

          + ExecutorService getCallbackExecutor();
          +
          boolean isChannelWritable(final String addr);
          }
          diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
          index 557ad5602..9f00f3606 100644
          — a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
          +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
          @@ -95,6 +95,13 @@
          */
          protected volatile SslContext sslContext;

          + protected ThreadLocal<Boolean> asynclockAcquired = new ThreadLocal<Boolean>() {
          + @Override
          + protected Boolean initialValue()

          { + return Boolean.FALSE; + }

          + };
          +
          /**

          • Constructor, specifying capacity of one-way and asynchronous semaphores.
            *
            @@ -271,6 +278,29 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
            }
            }

          + protected void executeAsyncCallback(Runnable runnable) {
          + boolean runInThisThread = false;
          + ExecutorService executor = this.getCallbackExecutor();
          + if (executor != null) {
          + try

          { + executor.submit(runnable); + }

          catch (Exception e)

          { + runInThisThread = true; + log.warn("execute async callback in executor exception, maybe executor busy", e); + }

          + } else

          { + runInThisThread = true; + }

          +
          + if (runInThisThread) {
          + try

          { + runnable.run(); + }

          catch (Throwable e)

          { + log.warn("executeAsyncCallback Exception", e); + }

          + }
          + }
          +
          /**

          • Execute callback in callback executor. If callback executor is null, run directly in current thread
            */
            @@ -400,7 +430,10 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request
            final InvokeCallback invokeCallback)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
            final int opaque = request.getOpaque();
          • boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
            + boolean acquired = asynclockAcquired.get();
            + if (!acquired) { + acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + }

            if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

          diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
          index dcc80cba0..1cf73dec5 100644
          — a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
          +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
          @@ -53,6 +53,8 @@
          import java.util.concurrent.atomic.AtomicReference;
          import java.util.concurrent.locks.Lock;
          import java.util.concurrent.locks.ReentrantLock;
          +
          +import org.apache.rocketmq.remoting.DoAsyncCallback;
          import org.apache.rocketmq.remoting.ChannelEventListener;
          import org.apache.rocketmq.remoting.InvokeCallback;
          import org.apache.rocketmq.remoting.RPCHook;
          @@ -61,6 +63,7 @@
          import org.apache.rocketmq.remoting.common.RemotingHelper;
          import org.apache.rocketmq.remoting.common.RemotingUtil;
          import org.apache.rocketmq.remoting.exception.RemotingConnectException;
          +import org.apache.rocketmq.remoting.exception.RemotingException;
          import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
          import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
          import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
          @@ -507,6 +510,54 @@ private Channel createChannel(final String addr) throws InterruptedException

          { return null; }

          + @Override
          + public void doAsyncSend(final DoAsyncCallback callback) {
          + try {
          + boolean acquired = this.semaphoreAsync.tryAcquire(1, TimeUnit.MILLISECONDS);
          + if (acquired) {
          + asynclockAcquired.set(Boolean.TRUE);
          + executeAsyncCallback(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + callback.onSuccess(); + }

          catch (Throwable e) {
          + semaphoreAsync.release();
          + asynclockAcquired.set(Boolean.FALSE);
          + if (e instanceof RemotingException)

          { + // ignore + }

          else

          { + log.error("doAsyncSendBeforeNetwork call onSuccess failed", e); + }

          + }
          + }
          + });
          + } else {
          + executeAsyncCallback(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + callback.onFailed(null); + }

          catch (Throwable e)

          { + log.error("doAsyncSendBeforeNetwork call onFailed failed", e); + }

          + }
          + });
          + }
          + } catch (final Throwable e) {
          + executeAsyncCallback(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + callback.onFailed(e); + }

          catch (Throwable e)

          { + log.error("doAsyncSendBeforeNetwork call onFailed failed", e); + }

          + }
          + });
          + }
          + }
          +
          @Override
          public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
          throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - lindzh closed pull request #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7c1697967..6729669da 100644 — a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; @@ -74,6 +75,7 @@ import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.remoting.DoAsyncCallback; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -249,7 +251,7 @@ public TransactionCheckListener checkListener() { @Override public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { + final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { private final String brokerAddr = addr; private final MessageExt message = msg; @@ -409,17 +411,43 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) /** DEFAULT ASYNC ------------------------------------------------------- */ public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { throw new MQClientException("unknownn exception", e); + public void send(final Message msg, final SendCallback sendCallback, final long timeout) { + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new DoAsyncCallback() { + @Override + public long getTimeout() { + return timeout; + } + + @Override + public void onSuccess() throws RemotingException { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); + throw new RemotingException("client send check exception",e); + } + } + + @Override + public void onFailed(Throwable e) { + handleCallbackException(e, sendCallback); + } + }); + } + + private void handleCallbackException(Throwable e, SendCallback sendCallback) { + if (sendCallback != null) { + if (e instanceof MQBrokerException) { + sendCallback.onException(new MQClientException("unknown exception", e)); + } else { + sendCallback.onException(e); + } + } else { + log.warn("asyncSend message callback null real exception is " + e.getMessage(), e); } } @@ -583,11 +611,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { } private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); @@ -842,32 +870,45 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) /** * KERNEL ASYNC ------------------------------------------------------- */ - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) { + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new DoAsyncCallback() { + @Override + public long getTimeout() {+ return timeout;+ } if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } + @Override + public void onSuccess() throws RemotingException { + try { + makeSureStateOK(); + Validators.checkMessage(msg, defaultMQProducer); try { - this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); + throw new RemotingException("client send check exception",e); + } + } + + @Override + public void onFailed(Throwable e) { + handleCallbackException(e, sendCallback); + } + }); } /** * KERNEL ONEWAY ------------------------------------------------------- */ public void sendOneway(Message msg, - MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -923,18 +964,32 @@ private SendResult sendSelectImpl( /** * SELECT ASYNC ------------------------------------------------------- */ - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) { send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); - } + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) { + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new DoAsyncCallback() { + @Override + public long getTimeout() { + return timeout; + } + + @Override + public void onSuccess() throws RemotingException { + try { + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) {+ handleCallbackException(e, sendCallback);+ throw new RemotingException("client send check exception",e);+ } + } + + @Override + public void onFailed(Throwable e) { + handleCallbackException(e, sendCallback); + } + }); } /** @@ -950,7 +1005,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) } public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); @@ -1064,8 +1119,12 @@ public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); } + private ExecutorService getCallbackExecutor() { + return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor(); + } + public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index a2f25dd0f..07ac9f625 100644 — a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -250,8 +250,8 @@ public SendResult send(Message msg, @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, + InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } @@ -266,8 +266,8 @@ public void send(Message msg, @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, + RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } @@ -333,8 +333,8 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, + RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback); } @@ -350,8 +350,8 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) @throws InterruptedException if the sending thread is interrupted. */ @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, + RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 14caf6ffa..dee19b27a 100644 — a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -70,12 +70,10 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; + final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException; + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada9..a3d11a9de 100644 — a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -24,6 +24,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; @@ -214,6 +218,51 @@ public void testSetCallbackExecutor() throws MQClientException { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); } + @Test + public void testAsyncSend() throws MQClientException, RemotingException, InterruptedException { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new DefaultMQProducer(producerGroupTemp); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(6); + + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + + } + + @Override + public void onException(Throwable e) { + e.printStackTrace(); + countDownLatch.countDown(); + cc.incrementAndGet(); + } + }; + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + return null; + } + }; + + Message message = new Message(); + message.setTopic("test"); + message.setBody("hello world".getBytes()); + producer.send(new Message(),sendCallback); + producer.send(message,sendCallback,1000); + producer.send(message,new MessageQueue(),sendCallback); + producer.send(new Message(),new MessageQueue(),sendCallback,1000); + producer.send(new Message(),messageQueueSelector,null,sendCallback); + producer.send(message,messageQueueSelector,null,sendCallback,1000); + + countDownLatch.await(1000L, TimeUnit.MILLISECONDS); + + assertThat(cc.get()).isEqualTo(6); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java new file mode 100644 index 000000000..6de9e9dbb — /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting; + +import org.apache.rocketmq.remoting.exception.RemotingException; + +public interface DoAsyncCallback { + + long getTimeout(); + + void onSuccess() throws RemotingException; + + void onFailed(Throwable e); + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 2aea14cb9..e0763b434 100644 — a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -39,6 +39,8 @@ void invokeAsync(final String addr, final RemotingCommand request, final long ti final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; + void doAsyncSend(DoAsyncCallback callback); + void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; @@ -48,5 +50,7 @@ void registerProcessor(final int requestCode, final NettyRequestProcessor proces void setCallbackExecutor(final ExecutorService callbackExecutor); + ExecutorService getCallbackExecutor(); + boolean isChannelWritable(final String addr); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 557ad5602..9f00f3606 100644 — a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -95,6 +95,13 @@ */ protected volatile SslContext sslContext; + protected ThreadLocal<Boolean> asynclockAcquired = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return Boolean.FALSE; + } + }; + /** Constructor, specifying capacity of one-way and asynchronous semaphores. * @@ -271,6 +278,29 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm } } + protected void executeAsyncCallback(Runnable runnable) { + boolean runInThisThread = false; + ExecutorService executor = this.getCallbackExecutor(); + if (executor != null) { + try { + executor.submit(runnable); + } catch (Exception e) { + runInThisThread = true; + log.warn("execute async callback in executor exception, maybe executor busy", e); + } + } else { + runInThisThread = true; + } + + if (runInThisThread) { + try { + runnable.run(); + } catch (Throwable e) { + log.warn("executeAsyncCallback Exception", e); + } + } + } + /** Execute callback in callback executor. If callback executor is null, run directly in current thread */ @@ -400,7 +430,10 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + boolean acquired = asynclockAcquired.get(); + if (!acquired) { + acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + } if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index dcc80cba0..1cf73dec5 100644 — a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import org.apache.rocketmq.remoting.DoAsyncCallback; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -61,6 +63,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; @@ -507,6 +510,54 @@ private Channel createChannel(final String addr) throws InterruptedException { return null; } + @Override + public void doAsyncSend(final DoAsyncCallback callback) { + try { + boolean acquired = this.semaphoreAsync.tryAcquire(1, TimeUnit.MILLISECONDS); + if (acquired) { + asynclockAcquired.set(Boolean.TRUE); + executeAsyncCallback(new Runnable() { + @Override + public void run() { + try { + callback.onSuccess(); + } catch (Throwable e) { + semaphoreAsync.release(); + asynclockAcquired.set(Boolean.FALSE); + if (e instanceof RemotingException) { + // ignore + } else { + log.error("doAsyncSendBeforeNetwork call onSuccess failed", e); + } + } + } + }); + } else { + executeAsyncCallback(new Runnable() { + @Override + public void run() { + try { + callback.onFailed(null); + } catch (Throwable e) { + log.error("doAsyncSendBeforeNetwork call onFailed failed", e); + } + } + }); + } + } catch (final Throwable e) { + executeAsyncCallback(new Runnable() { + @Override + public void run() { + try { + callback.onFailed(e); + } catch (Throwable e) { + log.error("doAsyncSendBeforeNetwork call onFailed failed", e); + } + } + }); + } + } + @Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          lindzh commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/219#issuecomment-360452200

          this pull request has been rebased to old commit.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - lindzh commented on issue #219: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/219#issuecomment-360452200 this pull request has been rebased to old commit. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          coveralls commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/222#issuecomment-360454546

          [![Coverage Status](https://coveralls.io/builds/15216261/badge)](https://coveralls.io/builds/15216261)

          Coverage increased (+0.2%) to 40.213% when pulling *d57f4b126b9254298457be5edf7d9ba74e9e315c on lindzh:fix_asyncSend* into *a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop*.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - coveralls commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/222#issuecomment-360454546 [! [Coverage Status] ( https://coveralls.io/builds/15216261/badge)](https://coveralls.io/builds/15216261 ) Coverage increased (+0.2%) to 40.213% when pulling * d57f4b126b9254298457be5edf7d9ba74e9e315c on lindzh:fix_asyncSend * into * a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop *. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          coveralls commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/222#issuecomment-360454546

          [![Coverage Status](https://coveralls.io/builds/15216431/badge)](https://coveralls.io/builds/15216431)

          Coverage increased (+0.2%) to 40.2% when pulling *f8113ebc08c5f2f0a95a920058534cb8167c31df on lindzh:fix_asyncSend* into *a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop*.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - coveralls commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/222#issuecomment-360454546 [! [Coverage Status] ( https://coveralls.io/builds/15216431/badge)](https://coveralls.io/builds/15216431 ) Coverage increased (+0.2%) to 40.2% when pulling * f8113ebc08c5f2f0a95a920058534cb8167c31df on lindzh:fix_asyncSend * into * a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop *. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          coveralls commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/222#issuecomment-360454546

          [![Coverage Status](https://coveralls.io/builds/15231078/badge)](https://coveralls.io/builds/15231078)

          Coverage increased (+0.2%) to 40.138% when pulling *0e09e265456f81315a3b2d1edd67fc0370638cb5 on lindzh:fix_asyncSend* into *a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop*.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - coveralls commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/222#issuecomment-360454546 [! [Coverage Status] ( https://coveralls.io/builds/15231078/badge)](https://coveralls.io/builds/15231078 ) Coverage increased (+0.2%) to 40.138% when pulling * 0e09e265456f81315a3b2d1edd67fc0370638cb5 on lindzh:fix_asyncSend * into * a096580f3152e12fc9f9876c9e9721eb0109a90a on apache:develop *. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          zhouxinyu commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/222#issuecomment-361055475

          Hi @lindzh , No need to close the previous PR and open a new, while the comments will be lost.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - zhouxinyu commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/222#issuecomment-361055475 Hi @lindzh , No need to close the previous PR and open a new, while the comments will be lost. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          lindzh commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async
          URL: https://github.com/apache/rocketmq/pull/222#issuecomment-362966428

          @zhouxinyu,yeah, keep comment is a good idea for continuous develop. At first I want to limit the send request and using network semaphore as send signal,but after doing that I found it was a mistake and open another PR for concise and speed.

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - lindzh commented on issue #222: ROCKETMQ-355 Client asyncSend is not fully async URL: https://github.com/apache/rocketmq/pull/222#issuecomment-362966428 @zhouxinyu,yeah, keep comment is a good idea for continuous develop. At first I want to limit the send request and using network semaphore as send signal,but after doing that I found it was a mistake and open another PR for concise and speed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org

          People

            vintagewang Xiaorui Wang
            lindzh lindzh
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: