Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-297

Storm Performance cannot be scaled up by adding more CPU cores

    Details

      Description

      We cannot scale up the performance by adding more CPU cores and increasing parallelism.

      For a 2 layer topology Spout --shuffle grouping-> bolt, when message size is small (around 100 bytes), we can find in the below picture that neither the CPU nor the network is saturated. When message size is 100 bytes, only 40% of CPU is used, only 18% of network is used, although we have a high parallelism (overall we have 144 executors)

      1. storm_conf.txt
        5 kB
        Sean Zhong
      2. storm_Netty_receiver_diagram.png
        65 kB
        Sean Zhong
      3. storm_performance_fix.patch
        127 kB
        Sean Zhong
      4. Storm_performance_fix.pdf
        514 kB
        Sean Zhong
      5. worker_throughput_without_storm-297.png
        27 kB
        Sean Zhong

        Issue Links

          Activity

          Hide
          clockfly Sean Zhong added a comment -

          We have a fix for this.

          Please check the attachment for problem definition, analysis, fix, and test result.

          Show
          clockfly Sean Zhong added a comment - We have a fix for this. Please check the attachment for problem definition, analysis, fix, and test result.
          Show
          clockfly Sean Zhong added a comment - link https://issues.apache.org/jira/secure/attachment/12641867/Storm_performance_fix.pdf
          Show
          clockfly Sean Zhong added a comment - Some earilier discussion can be found here http://mail-archives.us.apache.org/mod_mbox/incubator-storm-dev/201404.mbox/%3CCADiMvzUMV7Ab1RvwfV6rJHMqr2ekOg3kdTONsDhZHCmMhYWTsw@mail.gmail.com%3E
          Hide
          clockfly Sean Zhong added a comment -

          Demo patch

          Show
          clockfly Sean Zhong added a comment - Demo patch
          Show
          clockfly Sean Zhong added a comment - A demo patch is attached at https://issues.apache.org/jira/secure/attachment/12641868/storm_performance_fix.patch
          Hide
          revans2 Robert Joseph Evans added a comment -

          The performance numbers look very impressive. Do you plan on making the demo patch an actual pull request? Pull requests are just a lot easier to comment on.

          Show
          revans2 Robert Joseph Evans added a comment - The performance numbers look very impressive. Do you plan on making the demo patch an actual pull request? Pull requests are just a lot easier to comment on.
          Hide
          clockfly Sean Zhong added a comment -

          Thank you! You comments is inspiring!

          I will break it down to several pull request for review.

          On Sat, Apr 26, 2014 at 12:00 AM, Robert Joseph Evans (JIRA) <

          Show
          clockfly Sean Zhong added a comment - Thank you! You comments is inspiring! I will break it down to several pull request for review. On Sat, Apr 26, 2014 at 12:00 AM, Robert Joseph Evans (JIRA) <
          Hide
          miguno Michael Noll added a comment -

          Let me second Bobby's comment – many thanks for the detailed report and also your code contribution, Sean. Much appreciated!

          Show
          miguno Michael Noll added a comment - Let me second Bobby's comment – many thanks for the detailed report and also your code contribution, Sean. Much appreciated!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user clockfly opened a pull request:

          https://github.com/apache/incubator-storm/pull/103

          STORM-297 Storm Performance cannot be scaled up by adding more CPU cores

          STORM-297:

          Description and test report can be found at https://issues.apache.org/jira/browse/STORM-297
          The changes consists of:
          1. use netty async
          2. use batch send and batch receiver messaging api
          3. allow to configure multiple worker receiver threads.
          4. name the executor and netty threads

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/clockfly/incubator-storm storm_async_netty_and_batch_api

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/incubator-storm/pull/103.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #103


          commit 861a92eab8740cfc0f83ac4d7ade9a2ab04a8b9b
          Author: Sean Zhong <clockfly@gmail.com>
          Date: 2014-05-07T03:10:07Z

          1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user clockfly opened a pull request: https://github.com/apache/incubator-storm/pull/103 STORM-297 Storm Performance cannot be scaled up by adding more CPU cores STORM-297 : Description and test report can be found at https://issues.apache.org/jira/browse/STORM-297 The changes consists of: 1. use netty async 2. use batch send and batch receiver messaging api 3. allow to configure multiple worker receiver threads. 4. name the executor and netty threads You can merge this pull request into a Git repository by running: $ git pull https://github.com/clockfly/incubator-storm storm_async_netty_and_batch_api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-storm/pull/103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #103 commit 861a92eab8740cfc0f83ac4d7ade9a2ab04a8b9b Author: Sean Zhong <clockfly@gmail.com> Date: 2014-05-07T03:10:07Z 1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads
          Hide
          clockfly Sean Zhong added a comment -

          UT Result:
          Tests run: 156, Assertions: 101460, Failures: 0, Errors: 0

          Show
          clockfly Sean Zhong added a comment - UT Result: Tests run: 156, Assertions: 101460, Failures: 0, Errors: 0
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362464

          — Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj —
          @@ -204,6 +204,7 @@
          storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
          executor-type (executor-type worker-context component-id)
          batch-transfer->worker (disruptor/disruptor-queue
          + (str "executor" executor-id "-send-queue")
          — End diff –

          give queue a name so that the thread for the queue has a reasonable name

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362464 — Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj — @@ -204,6 +204,7 @@ storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id) executor-type (executor-type worker-context component-id) batch-transfer->worker (disruptor/disruptor-queue + (str "executor" executor-id "-send-queue") — End diff – give queue a name so that the thread for the queue has a reasonable name
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362548

          — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj —
          @@ -109,25 +111,30 @@
          (defn mk-transfer-fn [worker]
          (let [local-tasks (-> worker :task-ids set)
          local-transfer (:transfer-local-fn worker)

          • ^DisruptorQueue transfer-queue (:transfer-queue worker)]
            + ^DisruptorQueue transfer-queue (:transfer-queue worker)
            + task->node+port (:cached-task->node+port worker)]
            (fn [^KryoTupleSerializer serializer tuple-batch]
            (let [local (ArrayList.)
          • remote (ArrayList.)]
            + remoteMap (HashMap.)]
            (fast-list-iter [[task tuple :as pair] tuple-batch]
            (if (local-tasks task)
            (.add local pair)
          • (.add remote pair)
          • ))
            + (let [node+port (get @task->node+port task)]
              • End diff –

          Move the message grouping(group by node+port) code from worker transfer thread to executor send thread, as we only have ONE worker transfer thread, and it can become the bottleneck.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362548 — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj — @@ -109,25 +111,30 @@ (defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] + ^DisruptorQueue transfer-queue (:transfer-queue worker) + task->node+port (:cached-task->node+port worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] + remoteMap (HashMap.)] (fast-list-iter [ [task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) + (let [node+port (get @task->node+port task)] End diff – Move the message grouping(group by node+port) code from worker transfer thread to executor send thread, as we only have ONE worker transfer thread, and it can become the bottleneck.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362569

          — Diff: storm-core/src/clj/backtype/storm/messaging/loader.clj —
          @@ -24,45 +24,62 @@
          (defn mk-local-context []
          (local/mk-context))

          — End diff –

          support mutiple worker receive threads

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362569 — Diff: storm-core/src/clj/backtype/storm/messaging/loader.clj — @@ -24,45 +24,62 @@ (defn mk-local-context [] (local/mk-context)) — End diff – support mutiple worker receive threads
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362591

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Context.java —
          @@ -47,12 +48,14 @@ public void prepare(Map storm_conf) {

          //each context will have a single client channel factory
          int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
          — End diff –

          give netty thread a name

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362591 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Context.java — @@ -47,12 +48,14 @@ public void prepare(Map storm_conf) { //each context will have a single client channel factory int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS)); — End diff – give netty thread a name
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362595

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java —
          @@ -34,52 +37,78 @@
          */
          protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
          // Make sure that we have received at least a short

          • if (buf.readableBytes() < 2) {
            + long available = buf.readableBytes();
            + if (available < 2) { //need more data return null; }
          • // Mark the current buffer position before reading task/len field
          • // because the whole frame might not be in the buffer yet.
          • // We will reset the buffer position to the marked position if
          • // there's not enough bytes in the buffer.
          • buf.markReaderIndex();
            -
          • //read the short field
          • short code = buf.readShort();
          • //case 1: Control message
          • ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
          • if (ctrl_msg != null) return ctrl_msg;
          • //case 2: task Message
          • short task = code;
          • // Make sure that we have received at least an integer (length)
          • if (buf.readableBytes() < 4) { - //need more data - buf.resetReaderIndex(); - return null; - }

            + List<Object> ret = new ArrayList<Object>();
            +
            + while (available >= 2) {

              • End diff –

          while loop, try to decode as more messages as possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362595 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java — @@ -34,52 +37,78 @@ */ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { // Make sure that we have received at least a short if (buf.readableBytes() < 2) { + long available = buf.readableBytes(); + if (available < 2) { //need more data return null; } // Mark the current buffer position before reading task/len field // because the whole frame might not be in the buffer yet. // We will reset the buffer position to the marked position if // there's not enough bytes in the buffer. buf.markReaderIndex(); - //read the short field short code = buf.readShort(); //case 1: Control message ControlMessage ctrl_msg = ControlMessage.mkMessage(code); if (ctrl_msg != null) return ctrl_msg; //case 2: task Message short task = code; // Make sure that we have received at least an integer (length) if (buf.readableBytes() < 4) { - //need more data - buf.resetReaderIndex(); - return null; - } + List<Object> ret = new ArrayList<Object>(); + + while (available >= 2) { End diff – while loop, try to decode as more messages as possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362613

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +181,103 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
            -
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + }

            }

          • //we are busily delivering messages, and will check queue upon response.
          • //When send() is called by senders, we should not thus call tryDeliverMessages().
          • wait_for_requests = false;
            -
          • //write request into socket channel
          • ChannelFuture future = channel.write(requests);
          • future.addListener(new ChannelFutureListener() {
          • public void operationComplete(ChannelFuture future)
          • throws Exception {
          • if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - }

            else {

          • LOG.debug("{} request(s) sent", requests.size());
            -
          • //Now that our requests have been sent, channel could be closed if needed
          • if (being_closed.get())
          • close_n_release();
          • }
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + if (channel.isWritable()) {
              • End diff –

          when channel is NOT writable, it means the internal netty buffer is full. In this case, we can try to buffer up more incoming messages.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362613 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +181,103 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; - //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + } } //we are busily delivering messages, and will check queue upon response. //When send() is called by senders, we should not thus call tryDeliverMessages(). wait_for_requests = false; - //write request into socket channel ChannelFuture future = channel.write(requests); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - } else { LOG.debug("{} request(s) sent", requests.size()); - //Now that our requests have been sent, channel could be closed if needed if (being_closed.get()) close_n_release(); } + if (null != messageBatch && !messageBatch.isEmpty()) { + if (channel.isWritable()) { End diff – when channel is NOT writable, it means the internal netty buffer is full. In this case, we can try to buffer up more incoming messages.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362647

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -31,35 +31,65 @@
          import org.slf4j.LoggerFactory;

          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.ThreadFactory;

          class Server implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Server.class);
          @SuppressWarnings("rawtypes")
          Map storm_conf;
          int port;

          • private LinkedBlockingQueue<TaskMessage> message_queue;
            + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
              • End diff –

          Create multiple queues for incoming messages. The size is the number of receiver threads.
          For message which is sent to same task, it will be stored in the same queue to preserve the message order.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362647 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -31,35 +31,65 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; class Server implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Server.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; private LinkedBlockingQueue<TaskMessage> message_queue; + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue; End diff – Create multiple queues for incoming messages. The size is the number of receiver threads. For message which is sent to same task, it will be stored in the same queue to preserve the message order.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12362656

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -31,35 +31,65 @@
          import org.slf4j.LoggerFactory;

          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.ThreadFactory;

          class Server implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Server.class);
          @SuppressWarnings("rawtypes")
          Map storm_conf;
          int port;

          • private LinkedBlockingQueue<TaskMessage> message_queue;
            + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
            volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
            final ChannelFactory factory;
            final ServerBootstrap bootstrap;
            -
            +
            + private int queueCount;
            + HashMap<Integer, Integer> taskToQueueId = null;
            + int roundRobinQueueId;
            +
            + boolean closing = false;
            + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
            +
            +
            @SuppressWarnings("rawtypes")
            Server(Map storm_conf, int port) {
            this.storm_conf = storm_conf;
            this.port = port;
          • message_queue = new LinkedBlockingQueue<TaskMessage>();
            -
            +
              • End diff –

          try to construct the map from taskId -> queueId in round robin manner.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12362656 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -31,35 +31,65 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; class Server implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Server.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; private LinkedBlockingQueue<TaskMessage> message_queue; + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue; volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; - + + private int queueCount; + HashMap<Integer, Integer> taskToQueueId = null; + int roundRobinQueueId; + + boolean closing = false; + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null)); + + @SuppressWarnings("rawtypes") Server(Map storm_conf, int port) { this.storm_conf = storm_conf; this.port = port; message_queue = new LinkedBlockingQueue<TaskMessage>(); - + End diff – try to construct the map from taskId -> queueId in round robin manner.
          Hide
          Gvain Jiahong Li added a comment -

          Does the performance number includes all components' 'transferred' throught as yahooeng does here http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty ? Or only the spouts' 'emitted' throughput counts ?

          Show
          Gvain Jiahong Li added a comment - Does the performance number includes all components' 'transferred' throught as yahooeng does here http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty ? Or only the spouts' 'emitted' throughput counts ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42414069

          Does the performance number includes all components' 'transferred' throught as yahooeng does here http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty ? Or only the spouts' 'emitted' throughput counts ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42414069 Does the performance number includes all components' 'transferred' throught as yahooeng does here http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty ? Or only the spouts' 'emitted' throughput counts ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42414269

          spout "transfered" throughput.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42414269 spout "transfered" throughput.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42416918

          Really impressive.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42416918 Really impressive.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12393972

          — Diff: storm-core/src/jvm/backtype/storm/Config.java —
          @@ -84,8 +84,27 @@
          */
          public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
          public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
          +
          + /**
          + * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
          + */
          + public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "netty.transfer.batch.size";
          + public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
          — End diff –

          This and all other new configuration parameters should be added to `defaults.yaml` with their default values.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12393972 — Diff: storm-core/src/jvm/backtype/storm/Config.java — @@ -84,8 +84,27 @@ */ public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class; + + /** + * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes + */ + public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "netty.transfer.batch.size"; + public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class; — End diff – This and all other new configuration parameters should be added to `defaults.yaml` with their default values.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12394305

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
          }

          public static Integer getInt(Object o) {

          • if(o instanceof Long) { - return ((Long) o ).intValue(); - }

            else if (o instanceof Integer)

            { - return (Integer) o; - }

            else if (o instanceof Short)

            { - return ((Short) o).intValue(); - }

            else

            { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - }

            + Integer result = getInt(o, null);
            + if (null == result)

            { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + }

            + return result;
            + }
            +
            + public static Integer getInt(Object o, Integer defaultValue) {
            + if (null == o)

            { + return defaultValue; + }

            +
            + if(o instanceof Long) {

              • End diff –

          Very minor point, but this could probably be tightened to:

          ```java
          if (o instanceof Number)

          { return ((Number) o).intValue(); }

          else

          { throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12394305 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static Integer getInt(Object o) { if(o instanceof Long) { - return ((Long) o ).intValue(); - } else if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof Short) { - return ((Short) o).intValue(); - } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - } + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + } + return result; + } + + public static Integer getInt(Object o, Integer defaultValue) { + if (null == o) { + return defaultValue; + } + + if(o instanceof Long) { End diff – Very minor point, but this could probably be tightened to: ```java if (o instanceof Number) { return ((Number) o).intValue(); } else { throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12395524

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers,
          ret.start();
          return ret;
          }
          +
          + public static void redirectStreamAsync(Process process) {
          + redirectStreamAsync(process.getInputStream(), System.out);
          — End diff –

          I don't see this code referenced from anywhere else.

          I assume this is an attempt at solving the issue where STDOUT can fill buffers and cause workers to hang (e.g. when GC logging is turned on without being directed to a file)?

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12395524 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, ret.start(); return ret; } + + public static void redirectStreamAsync(Process process) { + redirectStreamAsync(process.getInputStream(), System.out); — End diff – I don't see this code referenced from anywhere else. I assume this is an attempt at solving the issue where STDOUT can fill buffers and cause workers to hang (e.g. when GC logging is turned on without being directed to a file)?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12398727

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -84,43 +93,87 @@

          // Start the connection attempt.
          remote_addr = new InetSocketAddress(host, port);

          • bootstrap.connect(remote_addr);
            +
            + Thread flushChecker = new Thread(new Runnable() {
            + @Override
            + public void run() {
            + //make sure we have a connection
            + connect();
            +
            + while(!closing) {
            + long flushCheckTime = flushCheckTimer.get();
            + long now = System.currentTimeMillis();
            + if (now > flushCheckTime)
            Unknown macro: { + Channel channel = channelRef.get(); + if (null != channel && channel.isWritable()) { + flush(); + } + }

            + try

            { + Thread.sleep(flushCheckInterval); + }

            catch (InterruptedException e)

            { + break; + }

            + }
            +
            + }
            + });
            +
            + flushChecker.setDaemon(true);

              • End diff –

          Do we want to name this thread as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12398727 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -84,43 +93,87 @@ // Start the connection attempt. remote_addr = new InetSocketAddress(host, port); bootstrap.connect(remote_addr); + + Thread flushChecker = new Thread(new Runnable() { + @Override + public void run() { + //make sure we have a connection + connect(); + + while(!closing) { + long flushCheckTime = flushCheckTimer.get(); + long now = System.currentTimeMillis(); + if (now > flushCheckTime) Unknown macro: { + Channel channel = channelRef.get(); + if (null != channel && channel.isWritable()) { + flush(); + } + } + try { + Thread.sleep(flushCheckInterval); + } catch (InterruptedException e) { + break; + } + } + + } + }); + + flushChecker.setDaemon(true); End diff – Do we want to name this thread as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42479510

          In general this looks good to me, but it needs to be reviewed by additional committers familiar with the netty transport. Unit tests pass as did a basic smoke test on a 3-node cluster.

          As I mentioned in an earlier comment, the additional config parameters need to be added to the `defaults.yaml` file, preferably with some comments/documentation regarding usage and ramifications of certain settings (e.g. sync vs. async). That way users won't have to dig around in source code to determine the default values.

          I also noticed that some of the changes you mentioned in the pdf doc don't appear in this pull request (e.g. serialization.reserve.tuple.createTime)? I'm just curious as to why you left some things out.

          Thanks for the contribution!

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42479510 In general this looks good to me, but it needs to be reviewed by additional committers familiar with the netty transport. Unit tests pass as did a basic smoke test on a 3-node cluster. As I mentioned in an earlier comment, the additional config parameters need to be added to the `defaults.yaml` file, preferably with some comments/documentation regarding usage and ramifications of certain settings (e.g. sync vs. async). That way users won't have to dig around in source code to determine the default values. I also noticed that some of the changes you mentioned in the pdf doc don't appear in this pull request (e.g. serialization.reserve.tuple.createTime)? I'm just curious as to why you left some things out. Thanks for the contribution!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12410406

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers,
          ret.start();
          return ret;
          }
          +
          + public static void redirectStreamAsync(Process process) {
          + redirectStreamAsync(process.getInputStream(), System.out);
          — End diff –

          Hi Taylor,

          Yes, you are right. it is used to avoid the worker to hang due to stdout full. The original demo patch is very big, so I have to break it down to several patches. The code which use this function will appear in followup pull request.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12410406 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, ret.start(); return ret; } + + public static void redirectStreamAsync(Process process) { + redirectStreamAsync(process.getInputStream(), System.out); — End diff – Hi Taylor, Yes, you are right. it is used to avoid the worker to hang due to stdout full. The original demo patch is very big, so I have to break it down to several patches. The code which use this function will appear in followup pull request.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12410447

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -84,43 +93,87 @@

          // Start the connection attempt.
          remote_addr = new InetSocketAddress(host, port);

          • bootstrap.connect(remote_addr);
            +
            + Thread flushChecker = new Thread(new Runnable() {
            + @Override
            + public void run() {
            + //make sure we have a connection
            + connect();
            +
            + while(!closing) {
            + long flushCheckTime = flushCheckTimer.get();
            + long now = System.currentTimeMillis();
            + if (now > flushCheckTime)
            Unknown macro: { + Channel channel = channelRef.get(); + if (null != channel && channel.isWritable()) { + flush(); + } + }

            + try

            { + Thread.sleep(flushCheckInterval); + }

            catch (InterruptedException e)

            { + break; + }

            + }
            +
            + }
            + });
            +
            + flushChecker.setDaemon(true);

              • End diff –

          I will fix this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12410447 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -84,43 +93,87 @@ // Start the connection attempt. remote_addr = new InetSocketAddress(host, port); bootstrap.connect(remote_addr); + + Thread flushChecker = new Thread(new Runnable() { + @Override + public void run() { + //make sure we have a connection + connect(); + + while(!closing) { + long flushCheckTime = flushCheckTimer.get(); + long now = System.currentTimeMillis(); + if (now > flushCheckTime) Unknown macro: { + Channel channel = channelRef.get(); + if (null != channel && channel.isWritable()) { + flush(); + } + } + try { + Thread.sleep(flushCheckInterval); + } catch (InterruptedException e) { + break; + } + } + + } + }); + + flushChecker.setDaemon(true); End diff – I will fix this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42499434

          (e.g. serialization.reserve.tuple.createTime)?
          This changes a lot of files. I will do this in a followup pull request, to make the code change easier to review.
          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42499434 (e.g. serialization.reserve.tuple.createTime)? This changes a lot of files. I will do this in a followup pull request, to make the code change easier to review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12413170

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
          }

          public static Integer getInt(Object o) {

          • if(o instanceof Long) { - return ((Long) o ).intValue(); - }

            else if (o instanceof Integer)

            { - return (Integer) o; - }

            else if (o instanceof Short)

            { - return ((Short) o).intValue(); - }

            else

            { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - }

            + Integer result = getInt(o, null);
            + if (null == result)

            { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + }

            + return result;
            + }
            +
            + public static Integer getInt(Object o, Integer defaultValue) {
            + if (null == o)

            { + return defaultValue; + }

            +
            + if(o instanceof Long) {

              • End diff –

          Negative, Number has subtype of Float, Double, we want to throw for these two types

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12413170 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static Integer getInt(Object o) { if(o instanceof Long) { - return ((Long) o ).intValue(); - } else if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof Short) { - return ((Short) o).intValue(); - } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - } + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + } + return result; + } + + public static Integer getInt(Object o, Integer defaultValue) { + if (null == o) { + return defaultValue; + } + + if(o instanceof Long) { End diff – Negative, Number has subtype of Float, Double, we want to throw for these two types
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42634710

          Strange, the mail list notification not working.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42634710 Strange, the mail list notification not working.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nathanmarz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42916013

          Can you describe how the worker receive thread has been changed in the pull request? In particular, do we still have the guarantee that messages sent from task A to task B are received in the same order they are sent (or not received at all)? For example, if task A sends messages 1, 2, 3, 4, and task B receives 1, 4, 3 that would violate this guarantee.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nathanmarz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42916013 Can you describe how the worker receive thread has been changed in the pull request? In particular, do we still have the guarantee that messages sent from task A to task B are received in the same order they are sent (or not received at all)? For example, if task A sends messages 1, 2, 3, 4, and task B receives 1, 4, 3 that would violate this guarantee.
          Hide
          clockfly Sean Zhong added a comment -

          show how the message order is reserved when using multiple receiver thread

          Show
          clockfly Sean Zhong added a comment - show how the message order is reserved when using multiple receiver thread
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-42918783

          Hi Nathan,

          Thank you for your comments, I just drew a diagram for your reference.
          https://issues.apache.org/jira/secure/attachment/12644559/storm_Netty_receiver_diagram.png

          Sean

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-42918783 Hi Nathan, Thank you for your comments, I just drew a diagram for your reference. https://issues.apache.org/jira/secure/attachment/12644559/storm_Netty_receiver_diagram.png Sean
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12810544

          — Diff: conf/defaults.yaml —
          @@ -109,6 +112,15 @@ storm.messaging.netty.max_retries: 30
          storm.messaging.netty.max_wait_ms: 1000
          storm.messaging.netty.min_wait_ms: 100

          +# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
          +storm.messaging.netty.transfer.batch.size: 262144
          +
          +# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
          — End diff –

          If this always improves the latency and throughput why have this as a config option at all?

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12810544 — Diff: conf/defaults.yaml — @@ -109,6 +112,15 @@ storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 +# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. +storm.messaging.netty.transfer.batch.size: 262144 + +# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput. — End diff – If this always improves the latency and throughput why have this as a config option at all?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12811601

          — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj —
          @@ -109,25 +111,30 @@
          (defn mk-transfer-fn [worker]
          (let [local-tasks (-> worker :task-ids set)
          local-transfer (:transfer-local-fn worker)

          • ^DisruptorQueue transfer-queue (:transfer-queue worker)]
            + ^DisruptorQueue transfer-queue (:transfer-queue worker)
            + task->node+port (:cached-task->node+port worker)]
            (fn [^KryoTupleSerializer serializer tuple-batch]
            (let [local (ArrayList.)
          • remote (ArrayList.)]
            + remoteMap (HashMap.)]
            (fast-list-iter [[task tuple :as pair] tuple-batch]
            (if (local-tasks task)
            (.add local pair)
          • (.add remote pair)
          • ))
            + (let [node+port (get @task->node+port task)]
            + (when (not (.get remoteMap node+port))
            + (.put remoteMap node+port (ArrayList.)))
            + (let [remote (.get remoteMap node+port)]
            + (.add remote (TaskMessage. task (.serialize serializer tuple)))
            + ))))
              • End diff –

          The above code does not really feel like it is clojure, as it is updating mutable state. I would rather have see us do something like a group-by.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12811601 — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj — @@ -109,25 +111,30 @@ (defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] + ^DisruptorQueue transfer-queue (:transfer-queue worker) + task->node+port (:cached-task->node+port worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] + remoteMap (HashMap.)] (fast-list-iter [ [task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) + (let [node+port (get @task->node+port task)] + (when (not (.get remoteMap node+port)) + (.put remoteMap node+port (ArrayList.))) + (let [remote (.get remoteMap node+port)] + (.add remote (TaskMessage. task (.serialize serializer tuple))) + )))) End diff – The above code does not really feel like it is clojure, as it is updating mutable state. I would rather have see us do something like a group-by.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12811853

          — Diff: storm-core/src/clj/backtype/storm/disruptor.clj —
          @@ -89,7 +90,7 @@
          (consume-batch-when-available queue handler)
          0 )
          :kill-fn kill-fn

          • :thread-name thread-name
            + :thread-name (.getName queue)
              • End diff –

          The function this is a part of takes an optional :thread-name parameter. We should either remove that parameter or honor it. I would vote to remove it, because the only place I think it is called you modified as well (just below).

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12811853 — Diff: storm-core/src/clj/backtype/storm/disruptor.clj — @@ -89,7 +90,7 @@ (consume-batch-when-available queue handler) 0 ) :kill-fn kill-fn :thread-name thread-name + :thread-name (.getName queue) End diff – The function this is a part of takes an optional :thread-name parameter. We should either remove that parameter or honor it. I would vote to remove it, because the only place I think it is called you modified as well (just below).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12812348

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -21,52 +21,53 @@
          import backtype.storm.messaging.IConnection;
          import backtype.storm.messaging.TaskMessage;
          import backtype.storm.utils.Utils;
          -
          import org.jboss.netty.bootstrap.ClientBootstrap;
          import org.jboss.netty.channel.Channel;
          import org.jboss.netty.channel.ChannelFactory;
          -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
          import org.jboss.netty.channel.ChannelFuture;
          import org.jboss.netty.channel.ChannelFutureListener;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          -
          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          -import java.util.Timer;
          -import java.util.TimerTask;
          import java.util.Random;
          -import java.util.concurrent.LinkedBlockingQueue;
          -import java.util.concurrent.atomic.AtomicBoolean;
          -import java.util.concurrent.atomic.AtomicInteger;
          +import java.util.concurrent.atomic.AtomicLong;
          import java.util.concurrent.atomic.AtomicReference;

          -class Client implements IConnection {
          +public class Client implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Client.class);

          • private static final Timer TIMER = new Timer("netty-client-timer", true);
            -
            + private static final String PREFIX = "Netty-Client-";
            private final int max_retries;
            private final long base_sleep_ms;
            private final long max_sleep_ms;
          • private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
            private AtomicReference<Channel> channelRef;
            private final ClientBootstrap bootstrap;
          • InetSocketAddress remote_addr;
          • private AtomicInteger retries;
            + private InetSocketAddress remote_addr;
            +
            private final Random random = new Random();
            private final ChannelFactory factory;
            private final int buffer_size;
          • private final AtomicBoolean being_closed;
          • private boolean wait_for_requests;
            + private boolean closing;
            +
            + private Integer messageBatchSize;
            + private Boolean blocking = false;
              • End diff –

          Can we make these to an int and a boolean they should never be null and I would rather have it blow up sooner rather then later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12812348 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -21,52 +21,53 @@ import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.Utils; - import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.Random; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -class Client implements IConnection { +public class Client implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final Timer TIMER = new Timer("netty-client-timer", true); - + private static final String PREFIX = "Netty-Client-"; private final int max_retries; private final long base_sleep_ms; private final long max_sleep_ms; private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage private AtomicReference<Channel> channelRef; private final ClientBootstrap bootstrap; InetSocketAddress remote_addr; private AtomicInteger retries; + private InetSocketAddress remote_addr; + private final Random random = new Random(); private final ChannelFactory factory; private final int buffer_size; private final AtomicBoolean being_closed; private boolean wait_for_requests; + private boolean closing; + + private Integer messageBatchSize; + private Boolean blocking = false; End diff – Can we make these to an int and a boolean they should never be null and I would rather have it blow up sooner rather then later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12812473

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -84,43 +93,87 @@

          // Start the connection attempt.
          remote_addr = new InetSocketAddress(host, port);

          • bootstrap.connect(remote_addr);
            +
            + Thread flushChecker = new Thread(new Runnable() {
              • End diff –

          Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12812473 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -84,43 +93,87 @@ // Start the connection attempt. remote_addr = new InetSocketAddress(host, port); bootstrap.connect(remote_addr); + + Thread flushChecker = new Thread(new Runnable() { End diff – Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12812760

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -84,43 +93,87 @@

          // Start the connection attempt.
          remote_addr = new InetSocketAddress(host, port);

          • bootstrap.connect(remote_addr);
            +
            + Thread flushChecker = new Thread(new Runnable() {
            + @Override
            + public void run() {
            + //make sure we have a connection
            + connect();
            +
            + while(!closing) {
            + long flushCheckTime = flushCheckTimer.get();
            + long now = System.currentTimeMillis();
            + if (now > flushCheckTime)
            Unknown macro: { + Channel channel = channelRef.get(); + if (null != channel && channel.isWritable()) { + flush(); + } + }

            + try

            { + Thread.sleep(flushCheckInterval); + }

            catch (InterruptedException e)

            { + break; + }

            + }
            +
            + }
            + }, name() + "-flush-checker");
            +
            + flushChecker.setDaemon(true);
            + flushChecker.start();
            }

          /**

          • We will retry connection with exponential back-off policy
            */
          • void reconnect() {
          • close_n_release();
            -
          • //reconnect only if it's not being closed
          • if (being_closed.get()) return;
            -
          • final int tried_count = retries.incrementAndGet();
          • if (tried_count <= max_retries) {
          • long sleep = getSleepTimeMs();
          • LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
          • TIMER.schedule(new TimerTask() {
          • @Override
          • public void run() {
          • LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
          • bootstrap.connect(remote_addr);
          • }}, sleep);
          • } else {
          • LOG.warn(remote_addr+" is not reachable. We will close this client.");
          • close();
            + private synchronized void connect() {
              • End diff –

          I don't like the idea of having connect block until the connection is established. Any thread that tries to send data to a connection that is still being established will block until the connection is established. I think it is more robust to buffer the messages in a data structure and try to handle them later when the connection is finished.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12812760 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -84,43 +93,87 @@ // Start the connection attempt. remote_addr = new InetSocketAddress(host, port); bootstrap.connect(remote_addr); + + Thread flushChecker = new Thread(new Runnable() { + @Override + public void run() { + //make sure we have a connection + connect(); + + while(!closing) { + long flushCheckTime = flushCheckTimer.get(); + long now = System.currentTimeMillis(); + if (now > flushCheckTime) Unknown macro: { + Channel channel = channelRef.get(); + if (null != channel && channel.isWritable()) { + flush(); + } + } + try { + Thread.sleep(flushCheckInterval); + } catch (InterruptedException e) { + break; + } + } + + } + }, name() + "-flush-checker"); + + flushChecker.setDaemon(true); + flushChecker.start(); } /** We will retry connection with exponential back-off policy */ void reconnect() { close_n_release(); - //reconnect only if it's not being closed if (being_closed.get()) return; - final int tried_count = retries.incrementAndGet(); if (tried_count <= max_retries) { long sleep = getSleepTimeMs(); LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr); TIMER.schedule(new TimerTask() { @Override public void run() { LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr); bootstrap.connect(remote_addr); }}, sleep); } else { LOG.warn(remote_addr+" is not reachable. We will close this client."); close(); + private synchronized void connect() { End diff – I don't like the idea of having connect block until the connection is established. Any thread that tries to send data to a connection that is still being established will block until the connection is established. I think it is more robust to buffer the messages in a data structure and try to handle them later when the connection is finished.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12812919

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +181,105 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
            -
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + }

            }

          • //we are busily delivering messages, and will check queue upon response.
          • //When send() is called by senders, we should not thus call tryDeliverMessages().
          • wait_for_requests = false;
            -
          • //write request into socket channel
          • ChannelFuture future = channel.write(requests);
          • future.addListener(new ChannelFutureListener() {
          • public void operationComplete(ChannelFuture future)
          • throws Exception {
          • if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - }

            else {

          • LOG.debug("{} request(s) sent", requests.size());
            -
          • //Now that our requests have been sent, channel could be closed if needed
          • if (being_closed.get())
          • close_n_release();
          • }
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + }

            else

            { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); }
          • });
            + }
            +
            }
          • /**
          • * Take all enqueued messages from queue
          • * @return batch of messages
          • * @throws InterruptedException
          • *
          • * synchronized ... ensure that messages are delivered in the same order
          • * as they are added into queue
          • */
          • private MessageBatch tryTakeMessages() throws InterruptedException {
          • //1st message
          • Object msg = message_queue.poll();
          • if (msg == null) return null;
            -
          • MessageBatch batch = new MessageBatch(buffer_size);
          • //we will discard any message after CLOSE
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • return batch;
            + public String name()
            Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + }
          • batch.add((TaskMessage)msg);
          • while (Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found.=null)) {
          • //Is it a CLOSE message?
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • message_queue.take();
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • break;
            + private synchronized void flush() {
            + if (!closing) {
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + MessageBatch toBeFlushed = messageBatch;
            + Channel channel = channelRef.get();
            + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + }

            + messageBatch = null;

              • End diff –

          If Channel is null do we really want to drop the messageBatch?

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12812919 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +181,105 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; - //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + } } //we are busily delivering messages, and will check queue upon response. //When send() is called by senders, we should not thus call tryDeliverMessages(). wait_for_requests = false; - //write request into socket channel ChannelFuture future = channel.write(requests); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - } else { LOG.debug("{} request(s) sent", requests.size()); - //Now that our requests have been sent, channel could be closed if needed if (being_closed.get()) close_n_release(); } + if (null != messageBatch && !messageBatch.isEmpty()) { + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + } else { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); } }); + } + } /** * Take all enqueued messages from queue * @return batch of messages * @throws InterruptedException * * synchronized ... ensure that messages are delivered in the same order * as they are added into queue */ private MessageBatch tryTakeMessages() throws InterruptedException { //1st message Object msg = message_queue.poll(); if (msg == null) return null; - MessageBatch batch = new MessageBatch(buffer_size); //we will discard any message after CLOSE if (msg == ControlMessage.CLOSE_MESSAGE) { LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); return batch; + public String name() Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + } batch.add((TaskMessage)msg); while ( Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found. =null)) { //Is it a CLOSE message? if (msg == ControlMessage.CLOSE_MESSAGE) { message_queue.take(); LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); break; + private synchronized void flush() { + if (!closing) { + if (null != messageBatch && !messageBatch.isEmpty()) { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; End diff – If Channel is null do we really want to drop the messageBatch?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813204

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +181,105 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
            -
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + }

            }

          • //we are busily delivering messages, and will check queue upon response.
          • //When send() is called by senders, we should not thus call tryDeliverMessages().
          • wait_for_requests = false;
            -
          • //write request into socket channel
          • ChannelFuture future = channel.write(requests);
          • future.addListener(new ChannelFutureListener() {
          • public void operationComplete(ChannelFuture future)
          • throws Exception {
          • if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - }

            else {

          • LOG.debug("{} request(s) sent", requests.size());
            -
          • //Now that our requests have been sent, channel could be closed if needed
          • if (being_closed.get())
          • close_n_release();
          • }
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + }

            else

            { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); }
          • });
            + }
            +
            }
          • /**
          • * Take all enqueued messages from queue
          • * @return batch of messages
          • * @throws InterruptedException
          • *
          • * synchronized ... ensure that messages are delivered in the same order
          • * as they are added into queue
          • */
          • private MessageBatch tryTakeMessages() throws InterruptedException {
          • //1st message
          • Object msg = message_queue.poll();
          • if (msg == null) return null;
            -
          • MessageBatch batch = new MessageBatch(buffer_size);
          • //we will discard any message after CLOSE
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • return batch;
            + public String name()
            Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + }
          • batch.add((TaskMessage)msg);
          • while (Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found.=null)) {
          • //Is it a CLOSE message?
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • message_queue.take();
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • break;
            + private synchronized void flush() {
            + if (!closing) {
            + if (null != messageBatch && !messageBatch.isEmpty())
            Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; }

            -

          • //try to add this msg into batch
          • if (!batch.tryAdd((TaskMessage) msg))
          • break;
            -
          • //remove this message
          • message_queue.take();
            }
            -
          • return batch;
            }
            -
            +
            /**
          • gracefully close this client.
          • *
          • * We will send all existing requests, and then invoke close_n_release() method
            + *
            + * We will send all existing requests, and then invoke close_n_release()
            + * method
            */
          • public void close() {
          • //enqueue a CLOSE message so that shutdown() will be invoked
          • try { - message_queue.put(ControlMessage.CLOSE_MESSAGE); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • LOG.info("Interrupted Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
            + public synchronized void close() {
            + if (!closing) {
            + closing = true;
            + if (null != messageBatch && !messageBatch.isEmpty())
            Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; + }

            +
            + //wait for pendings to exit
            + while(pendings.get() != 0) {

              • End diff –

          Can we have a max number of iterations on this? I just feel it is more defensive to try for a while, and then if it does not work out log the message and go on. After all the worst thing that happens with leaving early is that some messages may not have been acked, which we more or less ignore anyways.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813204 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +181,105 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; - //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + } } //we are busily delivering messages, and will check queue upon response. //When send() is called by senders, we should not thus call tryDeliverMessages(). wait_for_requests = false; - //write request into socket channel ChannelFuture future = channel.write(requests); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - } else { LOG.debug("{} request(s) sent", requests.size()); - //Now that our requests have been sent, channel could be closed if needed if (being_closed.get()) close_n_release(); } + if (null != messageBatch && !messageBatch.isEmpty()) { + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + } else { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); } }); + } + } /** * Take all enqueued messages from queue * @return batch of messages * @throws InterruptedException * * synchronized ... ensure that messages are delivered in the same order * as they are added into queue */ private MessageBatch tryTakeMessages() throws InterruptedException { //1st message Object msg = message_queue.poll(); if (msg == null) return null; - MessageBatch batch = new MessageBatch(buffer_size); //we will discard any message after CLOSE if (msg == ControlMessage.CLOSE_MESSAGE) { LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); return batch; + public String name() Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + } batch.add((TaskMessage)msg); while ( Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found. =null)) { //Is it a CLOSE message? if (msg == ControlMessage.CLOSE_MESSAGE) { message_queue.take(); LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); break; + private synchronized void flush() { + if (!closing) { + if (null != messageBatch && !messageBatch.isEmpty()) Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; } - //try to add this msg into batch if (!batch.tryAdd((TaskMessage) msg)) break; - //remove this message message_queue.take(); } - return batch; } - + /** gracefully close this client. * * We will send all existing requests, and then invoke close_n_release() method + * + * We will send all existing requests, and then invoke close_n_release() + * method */ public void close() { //enqueue a CLOSE message so that shutdown() will be invoked try { - message_queue.put(ControlMessage.CLOSE_MESSAGE); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { LOG.info("Interrupted Connection to {} is being closed", remote_addr); being_closed.set(true); + public synchronized void close() { + if (!closing) { + closing = true; + if (null != messageBatch && !messageBatch.isEmpty()) Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; + } + + //wait for pendings to exit + while(pendings.get() != 0) { End diff – Can we have a max number of iterations on this? I just feel it is more defensive to try for a while, and then if it does not work out log the message and go on. After all the worst thing that happens with leaving early is that some messages may not have been acked, which we more or less ignore anyways.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813461

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -31,35 +31,69 @@
          import org.slf4j.LoggerFactory;

          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.ThreadFactory;

          class Server implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Server.class);
          @SuppressWarnings("rawtypes")
          Map storm_conf;
          int port;

          • private LinkedBlockingQueue<TaskMessage> message_queue;
            +
            + // Create multiple queues for incoming messages. The size equals the number of receiver threads.
            + // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
            + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
            +
            volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
            final ChannelFactory factory;
            final ServerBootstrap bootstrap;
            -
            +
            + private int queueCount;
            + HashMap<Integer, Integer> taskToQueueId = null;
            + int roundRobinQueueId;
            +
            + boolean closing = false;
            + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
            +
            +
            @SuppressWarnings("rawtypes")
            Server(Map storm_conf, int port) {
            this.storm_conf = storm_conf;
            this.port = port;
          • message_queue = new LinkedBlockingQueue<TaskMessage>();
            -
            +
            + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
              • End diff –

          This should be a part of Config.java.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813461 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -31,35 +31,69 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; class Server implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Server.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; private LinkedBlockingQueue<TaskMessage> message_queue; + + // Create multiple queues for incoming messages. The size equals the number of receiver threads. + // For message which is sent to same task, it will be stored in the same queue to preserve the message order. + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue; + volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; - + + private int queueCount; + HashMap<Integer, Integer> taskToQueueId = null; + int roundRobinQueueId; + + boolean closing = false; + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null)); + + @SuppressWarnings("rawtypes") Server(Map storm_conf, int port) { this.storm_conf = storm_conf; this.port = port; message_queue = new LinkedBlockingQueue<TaskMessage>(); - + + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1); End diff – This should be a part of Config.java.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813477

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -31,35 +31,69 @@
          import org.slf4j.LoggerFactory;

          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.ThreadFactory;

          class Server implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Server.class);
          @SuppressWarnings("rawtypes")
          Map storm_conf;
          int port;

          • private LinkedBlockingQueue<TaskMessage> message_queue;
            +
            + // Create multiple queues for incoming messages. The size equals the number of receiver threads.
            + // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
            + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
            +
            volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
            final ChannelFactory factory;
            final ServerBootstrap bootstrap;
            -
            +
            + private int queueCount;
            + HashMap<Integer, Integer> taskToQueueId = null;
            + int roundRobinQueueId;
            +
            + boolean closing = false;
            + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
            +
            +
            @SuppressWarnings("rawtypes")
            Server(Map storm_conf, int port) {
            this.storm_conf = storm_conf;
            this.port = port;
          • message_queue = new LinkedBlockingQueue<TaskMessage>();
            -
            +
            + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
            + roundRobinQueueId = 0;
            + taskToQueueId = new HashMap<Integer, Integer>();
            +
            + message_queue = new LinkedBlockingQueue[queueCount];
            + for (int i = 0; i < queueCount; i++) {
              • End diff –

          Indentation looks off here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813477 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -31,35 +31,69 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; class Server implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Server.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; private LinkedBlockingQueue<TaskMessage> message_queue; + + // Create multiple queues for incoming messages. The size equals the number of receiver threads. + // For message which is sent to same task, it will be stored in the same queue to preserve the message order. + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue; + volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; - + + private int queueCount; + HashMap<Integer, Integer> taskToQueueId = null; + int roundRobinQueueId; + + boolean closing = false; + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null)); + + @SuppressWarnings("rawtypes") Server(Map storm_conf, int port) { this.storm_conf = storm_conf; this.port = port; message_queue = new LinkedBlockingQueue<TaskMessage>(); - + + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1); + roundRobinQueueId = 0; + taskToQueueId = new HashMap<Integer, Integer>(); + + message_queue = new LinkedBlockingQueue [queueCount] ; + for (int i = 0; i < queueCount; i++) { End diff – Indentation looks off here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813684

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -72,34 +106,109 @@
          Channel channel = bootstrap.bind(new InetSocketAddress(port));
          allChannels.add(channel);
          }
          +
          + private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
          + ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
          +
          + for (int i = 0; i < msgs.size(); i++) {
          + TaskMessage message = msgs.get;
          + int task = message.task();
          +
          + if (task == -1)

          { + closing = true; + return null; + }

          +
          + Integer queueId = getMessageQueueId(task);
          +
          + if (null == messageGroups[queueId])

          { + messageGroups[queueId] = new ArrayList<TaskMessage>(); + }

          + messageGroups[queueId].add(message);
          + }
          + return messageGroups;
          + }
          +
          + private Integer getMessageQueueId(int task) {
          + // try to construct the map from taskId -> queueId in round robin manner.
          +
          + Integer queueId = taskToQueueId.get(task);
          + if (null == queueId) {
          + synchronized(taskToQueueId) {
          + //assgin task to queue in round-robin manner
          + if (null == taskToQueueId.get(task)) {
          + queueId = roundRobinQueueId++;
          +
          + taskToQueueId.put(task, queueId);
          + if (roundRobinQueueId == queueCount)

          { + roundRobinQueueId = 0; + }

          + }
          + }
          + }
          + return queueId;
          + }

          /**

          • enqueue a received message
          • @param message
          • @throws InterruptedException
            */
          • protected void enqueue(TaskMessage message) throws InterruptedException {
          • message_queue.put(message);
          • LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
          • }
            + protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
            +
            + if (null == msgs || msgs.size() == 0 || closing) { + return; + }
            +
            + ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
            +
            + if (null == messageGroups || closing) { + return; + }

            +
            + for (int receiverId = 0; receiverId < messageGroups.length; receiverId++)

            Unknown macro: { + ArrayList<TaskMessage> msgGroup = messageGroups[receiverId]; + if (null != msgGroup) { + message_queue[receiverId].put(msgGroup); + } + }

            + }

          /**

          • fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
            */
          • public TaskMessage recv(int flags) {
          • if ((flags & 0x01) == 0x01) {
            + public Iterator<TaskMessage> recv(int flags) {
              • End diff –

          Is there a reason we still have this API here? it seems like we don't actually want anyone to call this code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813684 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -72,34 +106,109 @@ Channel channel = bootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel); } + + private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) { + ArrayList<TaskMessage> messageGroups[] = new ArrayList [queueCount] ; + + for (int i = 0; i < msgs.size(); i++) { + TaskMessage message = msgs.get ; + int task = message.task(); + + if (task == -1) { + closing = true; + return null; + } + + Integer queueId = getMessageQueueId(task); + + if (null == messageGroups [queueId] ) { + messageGroups[queueId] = new ArrayList<TaskMessage>(); + } + messageGroups [queueId] .add(message); + } + return messageGroups; + } + + private Integer getMessageQueueId(int task) { + // try to construct the map from taskId -> queueId in round robin manner. + + Integer queueId = taskToQueueId.get(task); + if (null == queueId) { + synchronized(taskToQueueId) { + //assgin task to queue in round-robin manner + if (null == taskToQueueId.get(task)) { + queueId = roundRobinQueueId++; + + taskToQueueId.put(task, queueId); + if (roundRobinQueueId == queueCount) { + roundRobinQueueId = 0; + } + } + } + } + return queueId; + } /** enqueue a received message @param message @throws InterruptedException */ protected void enqueue(TaskMessage message) throws InterruptedException { message_queue.put(message); LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length); } + protected void enqueue(List<TaskMessage> msgs) throws InterruptedException { + + if (null == msgs || msgs.size() == 0 || closing) { + return; + } + + ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs); + + if (null == messageGroups || closing) { + return; + } + + for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) Unknown macro: { + ArrayList<TaskMessage> msgGroup = messageGroups[receiverId]; + if (null != msgGroup) { + message_queue[receiverId].put(msgGroup); + } + } + } /** fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1) */ public TaskMessage recv(int flags) { if ((flags & 0x01) == 0x01) { + public Iterator<TaskMessage> recv(int flags) { End diff – Is there a reason we still have this API here? it seems like we don't actually want anyone to call this code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813729

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -133,4 +242,12 @@ public synchronized void close() {
          public void send(int task, byte[] message)

          { throw new RuntimeException("Server connection should not send any messages"); }

          +
          + public void send(Iterator<TaskMessage> msgs)

          { + throw new RuntimeException("Server connection should not send any messages"); + }

          +
          + public String name() {
          — End diff –

          Indentation appears to be off here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813729 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -133,4 +242,12 @@ public synchronized void close() { public void send(int task, byte[] message) { throw new RuntimeException("Server connection should not send any messages"); } + + public void send(Iterator<TaskMessage> msgs) { + throw new RuntimeException("Server connection should not send any messages"); + } + + public String name() { — End diff – Indentation appears to be off here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813817

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java —
          @@ -18,70 +18,24 @@
          package backtype.storm.messaging.netty;

          import java.net.ConnectException;
          -import java.util.concurrent.atomic.AtomicBoolean;

          -import org.jboss.netty.channel.Channel;
          -import org.jboss.netty.channel.ChannelHandlerContext;
          -import org.jboss.netty.channel.ChannelStateEvent;
          -import org.jboss.netty.channel.ExceptionEvent;
          -import org.jboss.netty.channel.MessageEvent;
          -import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
          +import org.jboss.netty.channel.*;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          -import java.net.ConnectException;
          -import java.util.concurrent.atomic.AtomicBoolean;
          -
          public class StormClientHandler extends SimpleChannelUpstreamHandler {
          private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
          private Client client;

          • long start_time;

          StormClientHandler(Client client) {
          — End diff –

          If all this code does now is log error messages can we drop the Client from the constructor and rename it to something that describes what it does better?

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813817 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java — @@ -18,70 +18,24 @@ package backtype.storm.messaging.netty; import java.net.ConnectException; -import java.util.concurrent.atomic.AtomicBoolean; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.ConnectException; -import java.util.concurrent.atomic.AtomicBoolean; - public class StormClientHandler extends SimpleChannelUpstreamHandler { private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class); private Client client; long start_time; StormClientHandler(Client client) { — End diff – If all this code does now is log error messages can we drop the Client from the constructor and rename it to something that describes what it does better?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12813854

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java —
          @@ -41,30 +45,22 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {

          @Override
          public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {

          • Object msg = e.getMessage();
          • if (msg == null) return;
            -
          • //end of batch?
          • if (msg==ControlMessage.EOB_MESSAGE) { - Channel channel = ctx.getChannel(); - LOG.debug("Send back response ..."); - if (failure_count.get()==0) - channel.write(ControlMessage.OK_RESPONSE); - else channel.write(ControlMessage.FAILURE_RESPONSE); - return; - }
          • //enqueue the received message for processing
          • try { - server.enqueue((TaskMessage)msg); - }

            catch (InterruptedException e1)

            { - LOG.info("failed to enqueue a request message", e); - failure_count.incrementAndGet(); - }

            + List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
            + if (msgs == null)

            { + return; + }

            +
            + try

            { + server.enqueue(msgs); + }

            catch (InterruptedException e1)

            { + LOG.info("failed to enqueue a request message", e); + failure_count.incrementAndGet(); + }

            }

          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
          + e.getCause().printStackTrace();
          — End diff –

          Can we log this properly instead of just printing the stack trace to stderr?

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12813854 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java — @@ -41,30 +45,22 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Object msg = e.getMessage(); if (msg == null) return; - //end of batch? if (msg==ControlMessage.EOB_MESSAGE) { - Channel channel = ctx.getChannel(); - LOG.debug("Send back response ..."); - if (failure_count.get()==0) - channel.write(ControlMessage.OK_RESPONSE); - else channel.write(ControlMessage.FAILURE_RESPONSE); - return; - } //enqueue the received message for processing try { - server.enqueue((TaskMessage)msg); - } catch (InterruptedException e1) { - LOG.info("failed to enqueue a request message", e); - failure_count.incrementAndGet(); - } + List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage(); + if (msgs == null) { + return; + } + + try { + server.enqueue(msgs); + } catch (InterruptedException e1) { + LOG.info("failed to enqueue a request message", e); + failure_count.incrementAndGet(); + } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + e.getCause().printStackTrace(); — End diff – Can we log this properly instead of just printing the stack trace to stderr?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12814221

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
          }

          public static Integer getInt(Object o) {

          • if(o instanceof Long) { - return ((Long) o ).intValue(); - }

            else if (o instanceof Integer)

            { - return (Integer) o; - }

            else if (o instanceof Short)

            { - return ((Short) o).intValue(); - }

            else

            { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - }

            + Integer result = getInt(o, null);
            + if (null == result)

            { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + }

            + return result;
            + }
            +
            + public static Integer getInt(Object o, Integer defaultValue) {
            + if (null == o)

            { + return defaultValue; + }

            +
            + if(o instanceof Long) {

              • End diff –

          Why? If I type in 3.5 for an integer config and I get a 3 out of it, is it a problem? If so then we need to change Config.java to not just expect a Number for these things, but instead have a proper checker that knows this should be an Integer, and checks ranges etc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12814221 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static Integer getInt(Object o) { if(o instanceof Long) { - return ((Long) o ).intValue(); - } else if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof Short) { - return ((Short) o).intValue(); - } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - } + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + } + return result; + } + + public static Integer getInt(Object o, Integer defaultValue) { + if (null == o) { + return defaultValue; + } + + if(o instanceof Long) { End diff – Why? If I type in 3.5 for an integer config and I get a 3 out of it, is it a problem? If so then we need to change Config.java to not just expect a Number for these things, but instead have a proper checker that knows this should be an Integer, and checks ranges etc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43553213

          I did a quick pass through the code. I have some concerns about the blocking nature of a number of the calls, especially connect. This just feels like it is going to make the worker block until all other workers are up and the connections have been established.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43553213 I did a quick pass through the code. I have some concerns about the blocking nature of a number of the calls, especially connect. This just feels like it is going to make the worker block until all other workers are up and the connections have been established.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12826580

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
          }

          public static Integer getInt(Object o) {

          • if(o instanceof Long) { - return ((Long) o ).intValue(); - }

            else if (o instanceof Integer)

            { - return (Integer) o; - }

            else if (o instanceof Short)

            { - return ((Short) o).intValue(); - }

            else

            { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - }

            + Integer result = getInt(o, null);
            + if (null == result)

            { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + }

            + return result;
            + }
            +
            + public static Integer getInt(Object o, Integer defaultValue) {
            + if (null == o)

            { + return defaultValue; + }

            +
            + if(o instanceof Long) {

              • End diff –

          We are consistent with old behavior.
          If a config expect a integer, then short, int, long are valid; double, float are invalid.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12826580 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static Integer getInt(Object o) { if(o instanceof Long) { - return ((Long) o ).intValue(); - } else if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof Short) { - return ((Short) o).intValue(); - } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - } + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + } + return result; + } + + public static Integer getInt(Object o, Integer defaultValue) { + if (null == o) { + return defaultValue; + } + + if(o instanceof Long) { End diff – We are consistent with old behavior. If a config expect a integer, then short, int, long are valid; double, float are invalid.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827274

          — Diff: storm-core/src/clj/backtype/storm/disruptor.clj —
          @@ -89,7 +90,7 @@
          (consume-batch-when-available queue handler)
          0 )
          :kill-fn kill-fn

          • :thread-name thread-name
            + :thread-name (.getName queue)
              • End diff –

          fixed with your suggestion, thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827274 — Diff: storm-core/src/clj/backtype/storm/disruptor.clj — @@ -89,7 +90,7 @@ (consume-batch-when-available queue handler) 0 ) :kill-fn kill-fn :thread-name thread-name + :thread-name (.getName queue) End diff – fixed with your suggestion, thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827309

          — Diff: conf/defaults.yaml —
          @@ -109,6 +112,15 @@ storm.messaging.netty.max_retries: 30
          storm.messaging.netty.max_wait_ms: 1000
          storm.messaging.netty.min_wait_ms: 100

          +# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
          +storm.messaging.netty.transfer.batch.size: 262144
          +
          +# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
          — End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827309 — Diff: conf/defaults.yaml — @@ -109,6 +112,15 @@ storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 +# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. +storm.messaging.netty.transfer.batch.size: 262144 + +# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput. — End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827316

          — Diff: storm-core/src/clj/backtype/storm/disruptor.clj —
          @@ -89,7 +90,7 @@
          (consume-batch-when-available queue handler)
          0 )
          :kill-fn kill-fn

          • :thread-name thread-name
            + :thread-name (.getName queue)
              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827316 — Diff: storm-core/src/clj/backtype/storm/disruptor.clj — @@ -89,7 +90,7 @@ (consume-batch-when-available queue handler) 0 ) :kill-fn kill-fn :thread-name thread-name + :thread-name (.getName queue) End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827318

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -21,52 +21,53 @@
          import backtype.storm.messaging.IConnection;
          import backtype.storm.messaging.TaskMessage;
          import backtype.storm.utils.Utils;
          -
          import org.jboss.netty.bootstrap.ClientBootstrap;
          import org.jboss.netty.channel.Channel;
          import org.jboss.netty.channel.ChannelFactory;
          -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
          import org.jboss.netty.channel.ChannelFuture;
          import org.jboss.netty.channel.ChannelFutureListener;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          -
          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          -import java.util.Timer;
          -import java.util.TimerTask;
          import java.util.Random;
          -import java.util.concurrent.LinkedBlockingQueue;
          -import java.util.concurrent.atomic.AtomicBoolean;
          -import java.util.concurrent.atomic.AtomicInteger;
          +import java.util.concurrent.atomic.AtomicLong;
          import java.util.concurrent.atomic.AtomicReference;

          -class Client implements IConnection {
          +public class Client implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Client.class);

          • private static final Timer TIMER = new Timer("netty-client-timer", true);
            -
            + private static final String PREFIX = "Netty-Client-";
            private final int max_retries;
            private final long base_sleep_ms;
            private final long max_sleep_ms;
          • private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
            private AtomicReference<Channel> channelRef;
            private final ClientBootstrap bootstrap;
          • InetSocketAddress remote_addr;
          • private AtomicInteger retries;
            + private InetSocketAddress remote_addr;
            +
            private final Random random = new Random();
            private final ChannelFactory factory;
            private final int buffer_size;
          • private final AtomicBoolean being_closed;
          • private boolean wait_for_requests;
            + private boolean closing;
            +
            + private Integer messageBatchSize;
            + private Boolean blocking = false;
              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827318 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -21,52 +21,53 @@ import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.Utils; - import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.Random; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -class Client implements IConnection { +public class Client implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final Timer TIMER = new Timer("netty-client-timer", true); - + private static final String PREFIX = "Netty-Client-"; private final int max_retries; private final long base_sleep_ms; private final long max_sleep_ms; private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage private AtomicReference<Channel> channelRef; private final ClientBootstrap bootstrap; InetSocketAddress remote_addr; private AtomicInteger retries; + private InetSocketAddress remote_addr; + private final Random random = new Random(); private final ChannelFactory factory; private final int buffer_size; private final AtomicBoolean being_closed; private boolean wait_for_requests; + private boolean closing; + + private Integer messageBatchSize; + private Boolean blocking = false; End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827323

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +181,105 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
            -
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + }

            }

          • //we are busily delivering messages, and will check queue upon response.
          • //When send() is called by senders, we should not thus call tryDeliverMessages().
          • wait_for_requests = false;
            -
          • //write request into socket channel
          • ChannelFuture future = channel.write(requests);
          • future.addListener(new ChannelFutureListener() {
          • public void operationComplete(ChannelFuture future)
          • throws Exception {
          • if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - }

            else {

          • LOG.debug("{} request(s) sent", requests.size());
            -
          • //Now that our requests have been sent, channel could be closed if needed
          • if (being_closed.get())
          • close_n_release();
          • }
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + }

            else

            { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); }
          • });
            + }
            +
            }
          • /**
          • * Take all enqueued messages from queue
          • * @return batch of messages
          • * @throws InterruptedException
          • *
          • * synchronized ... ensure that messages are delivered in the same order
          • * as they are added into queue
          • */
          • private MessageBatch tryTakeMessages() throws InterruptedException {
          • //1st message
          • Object msg = message_queue.poll();
          • if (msg == null) return null;
            -
          • MessageBatch batch = new MessageBatch(buffer_size);
          • //we will discard any message after CLOSE
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • return batch;
            + public String name()
            Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + }
          • batch.add((TaskMessage)msg);
          • while (Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found.=null)) {
          • //Is it a CLOSE message?
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • message_queue.take();
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • break;
            + private synchronized void flush() {
            + if (!closing) {
            + if (null != messageBatch && !messageBatch.isEmpty())
            Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; }

            -

          • //try to add this msg into batch
          • if (!batch.tryAdd((TaskMessage) msg))
          • break;
            -
          • //remove this message
          • message_queue.take();
            }
            -
          • return batch;
            }
            -
            +
            /**
          • gracefully close this client.
          • *
          • * We will send all existing requests, and then invoke close_n_release() method
            + *
            + * We will send all existing requests, and then invoke close_n_release()
            + * method
            */
          • public void close() {
          • //enqueue a CLOSE message so that shutdown() will be invoked
          • try { - message_queue.put(ControlMessage.CLOSE_MESSAGE); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • LOG.info("Interrupted Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
            + public synchronized void close() {
            + if (!closing) {
            + closing = true;
            + if (null != messageBatch && !messageBatch.isEmpty())
            Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; + }

            +
            + //wait for pendings to exit
            + while(pendings.get() != 0) {

              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827323 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +181,105 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; - //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + } } //we are busily delivering messages, and will check queue upon response. //When send() is called by senders, we should not thus call tryDeliverMessages(). wait_for_requests = false; - //write request into socket channel ChannelFuture future = channel.write(requests); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - } else { LOG.debug("{} request(s) sent", requests.size()); - //Now that our requests have been sent, channel could be closed if needed if (being_closed.get()) close_n_release(); } + if (null != messageBatch && !messageBatch.isEmpty()) { + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + } else { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); } }); + } + } /** * Take all enqueued messages from queue * @return batch of messages * @throws InterruptedException * * synchronized ... ensure that messages are delivered in the same order * as they are added into queue */ private MessageBatch tryTakeMessages() throws InterruptedException { //1st message Object msg = message_queue.poll(); if (msg == null) return null; - MessageBatch batch = new MessageBatch(buffer_size); //we will discard any message after CLOSE if (msg == ControlMessage.CLOSE_MESSAGE) { LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); return batch; + public String name() Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + } batch.add((TaskMessage)msg); while ( Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found. =null)) { //Is it a CLOSE message? if (msg == ControlMessage.CLOSE_MESSAGE) { message_queue.take(); LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); break; + private synchronized void flush() { + if (!closing) { + if (null != messageBatch && !messageBatch.isEmpty()) Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; } - //try to add this msg into batch if (!batch.tryAdd((TaskMessage) msg)) break; - //remove this message message_queue.take(); } - return batch; } - + /** gracefully close this client. * * We will send all existing requests, and then invoke close_n_release() method + * + * We will send all existing requests, and then invoke close_n_release() + * method */ public void close() { //enqueue a CLOSE message so that shutdown() will be invoked try { - message_queue.put(ControlMessage.CLOSE_MESSAGE); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { LOG.info("Interrupted Connection to {} is being closed", remote_addr); being_closed.set(true); + public synchronized void close() { + if (!closing) { + closing = true; + if (null != messageBatch && !messageBatch.isEmpty()) Unknown macro: { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; + } + + //wait for pendings to exit + while(pendings.get() != 0) { End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827325

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -31,35 +31,69 @@
          import org.slf4j.LoggerFactory;

          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.ThreadFactory;

          class Server implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Server.class);
          @SuppressWarnings("rawtypes")
          Map storm_conf;
          int port;

          • private LinkedBlockingQueue<TaskMessage> message_queue;
            +
            + // Create multiple queues for incoming messages. The size equals the number of receiver threads.
            + // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
            + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
            +
            volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
            final ChannelFactory factory;
            final ServerBootstrap bootstrap;
            -
            +
            + private int queueCount;
            + HashMap<Integer, Integer> taskToQueueId = null;
            + int roundRobinQueueId;
            +
            + boolean closing = false;
            + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
            +
            +
            @SuppressWarnings("rawtypes")
            Server(Map storm_conf, int port) {
            this.storm_conf = storm_conf;
            this.port = port;
          • message_queue = new LinkedBlockingQueue<TaskMessage>();
            -
            +
            + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827325 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -31,35 +31,69 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; class Server implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Server.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; private LinkedBlockingQueue<TaskMessage> message_queue; + + // Create multiple queues for incoming messages. The size equals the number of receiver threads. + // For message which is sent to same task, it will be stored in the same queue to preserve the message order. + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue; + volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; - + + private int queueCount; + HashMap<Integer, Integer> taskToQueueId = null; + int roundRobinQueueId; + + boolean closing = false; + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null)); + + @SuppressWarnings("rawtypes") Server(Map storm_conf, int port) { this.storm_conf = storm_conf; this.port = port; message_queue = new LinkedBlockingQueue<TaskMessage>(); - + + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1); End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827326

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -31,35 +31,69 @@
          import org.slf4j.LoggerFactory;

          import java.net.InetSocketAddress;
          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Iterator;
          +import java.util.List;
          import java.util.Map;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.ThreadFactory;

          class Server implements IConnection {
          private static final Logger LOG = LoggerFactory.getLogger(Server.class);
          @SuppressWarnings("rawtypes")
          Map storm_conf;
          int port;

          • private LinkedBlockingQueue<TaskMessage> message_queue;
            +
            + // Create multiple queues for incoming messages. The size equals the number of receiver threads.
            + // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
            + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
            +
            volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
            final ChannelFactory factory;
            final ServerBootstrap bootstrap;
            -
            +
            + private int queueCount;
            + HashMap<Integer, Integer> taskToQueueId = null;
            + int roundRobinQueueId;
            +
            + boolean closing = false;
            + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
            +
            +
            @SuppressWarnings("rawtypes")
            Server(Map storm_conf, int port) {
            this.storm_conf = storm_conf;
            this.port = port;
          • message_queue = new LinkedBlockingQueue<TaskMessage>();
            -
            +
            + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
            + roundRobinQueueId = 0;
            + taskToQueueId = new HashMap<Integer, Integer>();
            +
            + message_queue = new LinkedBlockingQueue[queueCount];
            + for (int i = 0; i < queueCount; i++) {
              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827326 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -31,35 +31,69 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; class Server implements IConnection { private static final Logger LOG = LoggerFactory.getLogger(Server.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; private LinkedBlockingQueue<TaskMessage> message_queue; + + // Create multiple queues for incoming messages. The size equals the number of receiver threads. + // For message which is sent to same task, it will be stored in the same queue to preserve the message order. + private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue; + volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; - + + private int queueCount; + HashMap<Integer, Integer> taskToQueueId = null; + int roundRobinQueueId; + + boolean closing = false; + List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null)); + + @SuppressWarnings("rawtypes") Server(Map storm_conf, int port) { this.storm_conf = storm_conf; this.port = port; message_queue = new LinkedBlockingQueue<TaskMessage>(); - + + queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1); + roundRobinQueueId = 0; + taskToQueueId = new HashMap<Integer, Integer>(); + + message_queue = new LinkedBlockingQueue [queueCount] ; + for (int i = 0; i < queueCount; i++) { End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827328

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -72,34 +106,109 @@
          Channel channel = bootstrap.bind(new InetSocketAddress(port));
          allChannels.add(channel);
          }
          +
          + private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
          + ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
          +
          + for (int i = 0; i < msgs.size(); i++) {
          + TaskMessage message = msgs.get;
          + int task = message.task();
          +
          + if (task == -1)

          { + closing = true; + return null; + }

          +
          + Integer queueId = getMessageQueueId(task);
          +
          + if (null == messageGroups[queueId])

          { + messageGroups[queueId] = new ArrayList<TaskMessage>(); + }

          + messageGroups[queueId].add(message);
          + }
          + return messageGroups;
          + }
          +
          + private Integer getMessageQueueId(int task) {
          + // try to construct the map from taskId -> queueId in round robin manner.
          +
          + Integer queueId = taskToQueueId.get(task);
          + if (null == queueId) {
          + synchronized(taskToQueueId) {
          + //assgin task to queue in round-robin manner
          + if (null == taskToQueueId.get(task)) {
          + queueId = roundRobinQueueId++;
          +
          + taskToQueueId.put(task, queueId);
          + if (roundRobinQueueId == queueCount)

          { + roundRobinQueueId = 0; + }

          + }
          + }
          + }
          + return queueId;
          + }

          /**

          • enqueue a received message
          • @param message
          • @throws InterruptedException
            */
          • protected void enqueue(TaskMessage message) throws InterruptedException {
          • message_queue.put(message);
          • LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
          • }
            + protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
            +
            + if (null == msgs || msgs.size() == 0 || closing) { + return; + }
            +
            + ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
            +
            + if (null == messageGroups || closing) { + return; + }

            +
            + for (int receiverId = 0; receiverId < messageGroups.length; receiverId++)

            Unknown macro: { + ArrayList<TaskMessage> msgGroup = messageGroups[receiverId]; + if (null != msgGroup) { + message_queue[receiverId].put(msgGroup); + } + }

            + }

          /**

          • fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
            */
          • public TaskMessage recv(int flags) {
          • if ((flags & 0x01) == 0x01) {
            + public Iterator<TaskMessage> recv(int flags) {
              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827328 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -72,34 +106,109 @@ Channel channel = bootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel); } + + private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) { + ArrayList<TaskMessage> messageGroups[] = new ArrayList [queueCount] ; + + for (int i = 0; i < msgs.size(); i++) { + TaskMessage message = msgs.get ; + int task = message.task(); + + if (task == -1) { + closing = true; + return null; + } + + Integer queueId = getMessageQueueId(task); + + if (null == messageGroups [queueId] ) { + messageGroups[queueId] = new ArrayList<TaskMessage>(); + } + messageGroups [queueId] .add(message); + } + return messageGroups; + } + + private Integer getMessageQueueId(int task) { + // try to construct the map from taskId -> queueId in round robin manner. + + Integer queueId = taskToQueueId.get(task); + if (null == queueId) { + synchronized(taskToQueueId) { + //assgin task to queue in round-robin manner + if (null == taskToQueueId.get(task)) { + queueId = roundRobinQueueId++; + + taskToQueueId.put(task, queueId); + if (roundRobinQueueId == queueCount) { + roundRobinQueueId = 0; + } + } + } + } + return queueId; + } /** enqueue a received message @param message @throws InterruptedException */ protected void enqueue(TaskMessage message) throws InterruptedException { message_queue.put(message); LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length); } + protected void enqueue(List<TaskMessage> msgs) throws InterruptedException { + + if (null == msgs || msgs.size() == 0 || closing) { + return; + } + + ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs); + + if (null == messageGroups || closing) { + return; + } + + for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) Unknown macro: { + ArrayList<TaskMessage> msgGroup = messageGroups[receiverId]; + if (null != msgGroup) { + message_queue[receiverId].put(msgGroup); + } + } + } /** fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1) */ public TaskMessage recv(int flags) { if ((flags & 0x01) == 0x01) { + public Iterator<TaskMessage> recv(int flags) { End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827330

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java —
          @@ -133,4 +242,12 @@ public synchronized void close() {
          public void send(int task, byte[] message)

          { throw new RuntimeException("Server connection should not send any messages"); }

          +
          + public void send(Iterator<TaskMessage> msgs)

          { + throw new RuntimeException("Server connection should not send any messages"); + }

          +
          + public String name() {
          — End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827330 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java — @@ -133,4 +242,12 @@ public synchronized void close() { public void send(int task, byte[] message) { throw new RuntimeException("Server connection should not send any messages"); } + + public void send(Iterator<TaskMessage> msgs) { + throw new RuntimeException("Server connection should not send any messages"); + } + + public String name() { — End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827334

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java —
          @@ -18,70 +18,24 @@
          package backtype.storm.messaging.netty;

          import java.net.ConnectException;
          -import java.util.concurrent.atomic.AtomicBoolean;

          -import org.jboss.netty.channel.Channel;
          -import org.jboss.netty.channel.ChannelHandlerContext;
          -import org.jboss.netty.channel.ChannelStateEvent;
          -import org.jboss.netty.channel.ExceptionEvent;
          -import org.jboss.netty.channel.MessageEvent;
          -import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
          +import org.jboss.netty.channel.*;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          -import java.net.ConnectException;
          -import java.util.concurrent.atomic.AtomicBoolean;
          -
          public class StormClientHandler extends SimpleChannelUpstreamHandler {
          private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
          private Client client;

          • long start_time;

          StormClientHandler(Client client) {
          — End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827334 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java — @@ -18,70 +18,24 @@ package backtype.storm.messaging.netty; import java.net.ConnectException; -import java.util.concurrent.atomic.AtomicBoolean; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.ConnectException; -import java.util.concurrent.atomic.AtomicBoolean; - public class StormClientHandler extends SimpleChannelUpstreamHandler { private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class); private Client client; long start_time; StormClientHandler(Client client) { — End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827338

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java —
          @@ -41,30 +45,22 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {

          @Override
          public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {

          • Object msg = e.getMessage();
          • if (msg == null) return;
            -
          • //end of batch?
          • if (msg==ControlMessage.EOB_MESSAGE) { - Channel channel = ctx.getChannel(); - LOG.debug("Send back response ..."); - if (failure_count.get()==0) - channel.write(ControlMessage.OK_RESPONSE); - else channel.write(ControlMessage.FAILURE_RESPONSE); - return; - }
          • //enqueue the received message for processing
          • try { - server.enqueue((TaskMessage)msg); - }

            catch (InterruptedException e1)

            { - LOG.info("failed to enqueue a request message", e); - failure_count.incrementAndGet(); - }

            + List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
            + if (msgs == null)

            { + return; + }

            +
            + try

            { + server.enqueue(msgs); + }

            catch (InterruptedException e1)

            { + LOG.info("failed to enqueue a request message", e); + failure_count.incrementAndGet(); + }

            }

          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
          + e.getCause().printStackTrace();
          — End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827338 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java — @@ -41,30 +45,22 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Object msg = e.getMessage(); if (msg == null) return; - //end of batch? if (msg==ControlMessage.EOB_MESSAGE) { - Channel channel = ctx.getChannel(); - LOG.debug("Send back response ..."); - if (failure_count.get()==0) - channel.write(ControlMessage.OK_RESPONSE); - else channel.write(ControlMessage.FAILURE_RESPONSE); - return; - } //enqueue the received message for processing try { - server.enqueue((TaskMessage)msg); - } catch (InterruptedException e1) { - LOG.info("failed to enqueue a request message", e); - failure_count.incrementAndGet(); - } + List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage(); + if (msgs == null) { + return; + } + + try { + server.enqueue(msgs); + } catch (InterruptedException e1) { + LOG.info("failed to enqueue a request message", e); + failure_count.incrementAndGet(); + } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + e.getCause().printStackTrace(); — End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827582

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +181,105 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
            -
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + }

            }

          • //we are busily delivering messages, and will check queue upon response.
          • //When send() is called by senders, we should not thus call tryDeliverMessages().
          • wait_for_requests = false;
            -
          • //write request into socket channel
          • ChannelFuture future = channel.write(requests);
          • future.addListener(new ChannelFutureListener() {
          • public void operationComplete(ChannelFuture future)
          • throws Exception {
          • if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - }

            else {

          • LOG.debug("{} request(s) sent", requests.size());
            -
          • //Now that our requests have been sent, channel could be closed if needed
          • if (being_closed.get())
          • close_n_release();
          • }
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + }

            else

            { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); }
          • });
            + }
            +
            }
          • /**
          • * Take all enqueued messages from queue
          • * @return batch of messages
          • * @throws InterruptedException
          • *
          • * synchronized ... ensure that messages are delivered in the same order
          • * as they are added into queue
          • */
          • private MessageBatch tryTakeMessages() throws InterruptedException {
          • //1st message
          • Object msg = message_queue.poll();
          • if (msg == null) return null;
            -
          • MessageBatch batch = new MessageBatch(buffer_size);
          • //we will discard any message after CLOSE
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • return batch;
            + public String name()
            Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + }
          • batch.add((TaskMessage)msg);
          • while (Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found.=null)) {
          • //Is it a CLOSE message?
          • if (msg == ControlMessage.CLOSE_MESSAGE) {
          • message_queue.take();
          • LOG.info("Connection to {} is being closed", remote_addr);
          • being_closed.set(true);
          • break;
            + private synchronized void flush() {
            + if (!closing) {
            + if (null != messageBatch && !messageBatch.isEmpty()) {
            + MessageBatch toBeFlushed = messageBatch;
            + Channel channel = channelRef.get();
            + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + }

            + messageBatch = null;

              • End diff –

          fixed. Change flush() to flush(Channel channel), as the caller of flush() already checked the channel to make sure it is not null.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827582 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +181,105 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; - //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed, blocking); + messageBatch = null; + } } //we are busily delivering messages, and will check queue upon response. //When send() is called by senders, we should not thus call tryDeliverMessages(). wait_for_requests = false; - //write request into socket channel ChannelFuture future = channel.write(requests); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause()); - reconnect(); - } else { LOG.debug("{} request(s) sent", requests.size()); - //Now that our requests have been sent, channel could be closed if needed if (being_closed.get()) close_n_release(); } + if (null != messageBatch && !messageBatch.isEmpty()) { + if (channel.isWritable()) { + flushCheckTimer.set(Long.MAX_VALUE); + + // Flush as fast as we can to reduce the latency + MessageBatch toBeFlushed = messageBatch; + messageBatch = null; + flushRequest(channel, toBeFlushed, blocking); + + } else { + // when channel is NOT writable, it means the internal netty buffer is full. + // In this case, we can try to buffer up more incoming messages. + flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); } }); + } + } /** * Take all enqueued messages from queue * @return batch of messages * @throws InterruptedException * * synchronized ... ensure that messages are delivered in the same order * as they are added into queue */ private MessageBatch tryTakeMessages() throws InterruptedException { //1st message Object msg = message_queue.poll(); if (msg == null) return null; - MessageBatch batch = new MessageBatch(buffer_size); //we will discard any message after CLOSE if (msg == ControlMessage.CLOSE_MESSAGE) { LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); return batch; + public String name() Unknown macro: { + if (null != remote_addr) { + return PREFIX + remote_addr.toString(); } + return ""; + } batch.add((TaskMessage)msg); while ( Unable to render embedded object: File (batch.isFull() && ((msg = message_queue.peek())) not found. =null)) { //Is it a CLOSE message? if (msg == ControlMessage.CLOSE_MESSAGE) { message_queue.take(); LOG.info("Connection to {} is being closed", remote_addr); being_closed.set(true); break; + private synchronized void flush() { + if (!closing) { + if (null != messageBatch && !messageBatch.isEmpty()) { + MessageBatch toBeFlushed = messageBatch; + Channel channel = channelRef.get(); + if (channel != null) { + flushCheckTimer.set(Long.MAX_VALUE); + flushRequest(channel, toBeFlushed, true); + } + messageBatch = null; End diff – fixed. Change flush() to flush(Channel channel), as the caller of flush() already checked the channel to make sure it is not null.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12827649

          — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj —
          @@ -109,25 +111,30 @@
          (defn mk-transfer-fn [worker]
          (let [local-tasks (-> worker :task-ids set)
          local-transfer (:transfer-local-fn worker)

          • ^DisruptorQueue transfer-queue (:transfer-queue worker)]
            + ^DisruptorQueue transfer-queue (:transfer-queue worker)
            + task->node+port (:cached-task->node+port worker)]
            (fn [^KryoTupleSerializer serializer tuple-batch]
            (let [local (ArrayList.)
          • remote (ArrayList.)]
            + remoteMap (HashMap.)]
            (fast-list-iter [[task tuple :as pair] tuple-batch]
            (if (local-tasks task)
            (.add local pair)
          • (.add remote pair)
          • ))
            + (let [node+port (get @task->node+port task)]
            + (when (not (.get remoteMap node+port))
            + (.put remoteMap node+port (ArrayList.)))
            + (let [remote (.get remoteMap node+port)]
            + (.add remote (TaskMessage. task (.serialize serializer tuple)))
            + ))))
              • End diff –

          This is on purpose for performance.
          The ArrayList and HashMap constructed here will be used directly in java. We designed a customized iterator inside class TransferDrainer, so that we don't need to wrap/unwrap data or copy data from clojure data strucutre to java data structure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12827649 — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj — @@ -109,25 +111,30 @@ (defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] + ^DisruptorQueue transfer-queue (:transfer-queue worker) + task->node+port (:cached-task->node+port worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] + remoteMap (HashMap.)] (fast-list-iter [ [task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) + (let [node+port (get @task->node+port task)] + (when (not (.get remoteMap node+port)) + (.put remoteMap node+port (ArrayList.))) + (let [remote (.get remoteMap node+port)] + (.add remote (TaskMessage. task (.serialize serializer tuple))) + )))) End diff – This is on purpose for performance. The ArrayList and HashMap constructed here will be used directly in java. We designed a customized iterator inside class TransferDrainer, so that we don't need to wrap/unwrap data or copy data from clojure data strucutre to java data structure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43588816

          Hi Bobby,

          Thank you for your comments. I addressed most of the issues, except these three:

          *issue1:*
          >
          ```
          + private synchronized void connect() {
          ```
          > I don't like the idea of having connect block until the connection is established. Any thread that tries to send data to a connection that is still being established will block until the connection is established. I think it is more robust to buffer the messages in a data structure and try to handle them later when the connection is finished.

          *reply:* I think it is better to have connection estabilish before we allow message sender to send message. Here is the my obervations when allowing message sender to send message without a established connection:

          1. Excessive memory usage
          If message senders are allow to send message without the connetion being established, the senders are encourged to send as fast as they can. In some profiling, I observed there was sharp increase of heap memory in the beginning, as we will buffer all those message in a unlimited queue in netty client. If the user are using unacked topology or set the topology.max.spout.pending to be a bigger enough value, it will possibly cause the JVM OOM.

          2. Longer latency
          The Netty Client queue can turns out to be very long, longer queue means longer latency. For example, suppose Netty Client can at max transfer 10 tuples/second, the Netty Client queue increased to size of 10000 because of this buffering, the spout generates 10 tuple/second, then in this case the queue size will stablize at 10000, the throughput will be 10 tuples/second. The throughput will be the same no matter the queue size is 100, or 1000, or 10000, the latency will be much bigger for queue size 10000. So it is very important to make sure the queue will not increase to too big from the begining.

          3. Reduced throughput.
          When the latency is longer, it will reduce the message generation speed of spout, as spout will wait message to be acked(unacked size controled by topology.max.spout.pending). The longer the initial latency is, the longer it takes for the spout to converge to the balanced speed of tuple generation.

          In the code, we will setup the connection in Client constructor asyncly before the send(message) is called to reduce the time that need to be waited by the message sender.
          ```
          Thread flushChecker = new Thread(new Runnable() {
          @Override
          public void run() {
          //make sure we have a connection
          connect(); //<-----------------here!
          ```

          *issue 2:*
          >
          ```
          storm-core/src/jvm/backtype/storm/messaging/netty/Client
          + Thread flushChecker = new Thread(new Runnable() {
          ```
          > Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.

          *Reply:* Can we do this in a followup patch? I have a local patch, but it requires more testing.

          *issue 3:*
          >
          ```
          + (let [node+port (get @task->node+port task)]
          + (when (not (.get remoteMap node+port))
          + (.put remoteMap node+port (ArrayList.)))
          + (let [remote (.get remoteMap node+port)]
          + (.add remote (TaskMessage. task (.serialize serializer tuple)))
          + ))))
          ```
          > The above code does not really feel like it is clojure, as it is updating mutable state. I would rather have see us do something like a group-by.

          *Reply:* This is on purpose for performance.
          The ArrayList and HashMap constructed here will be used directly in java. We designed a customized iterator inside class TransferDrainer, so that we don't need to wrap/unwrap data or copy data from clojure data strucutre to java data structure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43588816 Hi Bobby, Thank you for your comments. I addressed most of the issues, except these three: * issue1: * > ``` + private synchronized void connect() { ``` > I don't like the idea of having connect block until the connection is established. Any thread that tries to send data to a connection that is still being established will block until the connection is established. I think it is more robust to buffer the messages in a data structure and try to handle them later when the connection is finished. * reply: * I think it is better to have connection estabilish before we allow message sender to send message. Here is the my obervations when allowing message sender to send message without a established connection: 1. Excessive memory usage If message senders are allow to send message without the connetion being established, the senders are encourged to send as fast as they can. In some profiling, I observed there was sharp increase of heap memory in the beginning, as we will buffer all those message in a unlimited queue in netty client. If the user are using unacked topology or set the topology.max.spout.pending to be a bigger enough value, it will possibly cause the JVM OOM. 2. Longer latency The Netty Client queue can turns out to be very long, longer queue means longer latency. For example, suppose Netty Client can at max transfer 10 tuples/second, the Netty Client queue increased to size of 10000 because of this buffering, the spout generates 10 tuple/second, then in this case the queue size will stablize at 10000, the throughput will be 10 tuples/second. The throughput will be the same no matter the queue size is 100, or 1000, or 10000, the latency will be much bigger for queue size 10000. So it is very important to make sure the queue will not increase to too big from the begining. 3. Reduced throughput. When the latency is longer, it will reduce the message generation speed of spout, as spout will wait message to be acked(unacked size controled by topology.max.spout.pending). The longer the initial latency is, the longer it takes for the spout to converge to the balanced speed of tuple generation. In the code, we will setup the connection in Client constructor asyncly before the send(message) is called to reduce the time that need to be waited by the message sender. ``` Thread flushChecker = new Thread(new Runnable() { @Override public void run() { //make sure we have a connection connect(); //<-----------------here! ``` * issue 2: * > ``` storm-core/src/jvm/backtype/storm/messaging/netty/Client + Thread flushChecker = new Thread(new Runnable() { ``` > Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user. * Reply: * Can we do this in a followup patch? I have a local patch, but it requires more testing. * issue 3: * > ``` + (let [node+port (get @task->node+port task)] + (when (not (.get remoteMap node+port)) + (.put remoteMap node+port (ArrayList.))) + (let [remote (.get remoteMap node+port)] + (.add remote (TaskMessage. task (.serialize serializer tuple))) + )))) ``` > The above code does not really feel like it is clojure, as it is updating mutable state. I would rather have see us do something like a group-by. * Reply: * This is on purpose for performance. The ArrayList and HashMap constructed here will be used directly in java. We designed a customized iterator inside class TransferDrainer, so that we don't need to wrap/unwrap data or copy data from clojure data strucutre to java data structure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43621117

          Just a hint:
          What if we use MORE workers per node than just only ONE worker per node without changing the total number of executors ? By doing so, we will have MORE received threads, transfer threads and netty i/o threads for the total 144 executors. Should this increase total CPU usage and network bandwidth usage?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43621117 Just a hint: What if we use MORE workers per node than just only ONE worker per node without changing the total number of executors ? By doing so, we will have MORE received threads, transfer threads and netty i/o threads for the total 144 executors. Should this increase total CPU usage and network bandwidth usage?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43625555

          *issue 2:*

          >storm-core/src/jvm/backtype/storm/messaging/netty/Client
          >```
          + Thread flushChecker = new Thread(new Runnable() {
          ```
          > Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user.

          ~*Reply*: Can we do this in a followup patch? I have a local patch, but it requires more testing.~
          *Update*: This is resolved in latest checkin,

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43625555 * issue 2: * >storm-core/src/jvm/backtype/storm/messaging/netty/Client >``` + Thread flushChecker = new Thread(new Runnable() { ``` > Can we make this thread shared between the clients, otherwise we will have a dedicated thread per client, which can cause resource utilization issues, hitting a ulimit with the number of processes allowed per user. ~ * Reply *: Can we do this in a followup patch? I have a local patch, but it requires more testing. ~ * Update *: This is resolved in latest checkin,
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43627624

          Gvain,

          >What if we use MORE workers per node than just only ONE worker per node without changing the total number of executors ? By doing so, we will have MORE received threads, transfer threads and netty i/o threads for the total 144 executors. Should this increase total CPU usage and network bandwidth usage?

          We tried this approach before, but it won't give us the performance data we want. There were inherit bottleneck there. Besides, receiver.count is not the biggest bottleneck here, netty performance matter more.

          The logic behind make "receiver.count" configurable is that, since we allow user to configure spout/bolt executor count per worker, we should also allow user to configure the receiver thread count to make it consistent with the parallism settings.

          Besides, increasing worker count will,
          1. add more cross process or cross machine communication. For tasks inside same worker process, the message will be *local* dispatched to target task if possible.
          2. More Netty context switch and contention. Check http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty
          3. More outbound acker message count. Usually we will allocate one acker to one worker.

          So in common practice, each worker will have a moderate size of executors, neither too small, nor too big.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43627624 Gvain, >What if we use MORE workers per node than just only ONE worker per node without changing the total number of executors ? By doing so, we will have MORE received threads, transfer threads and netty i/o threads for the total 144 executors. Should this increase total CPU usage and network bandwidth usage? We tried this approach before, but it won't give us the performance data we want. There were inherit bottleneck there. Besides, receiver.count is not the biggest bottleneck here, netty performance matter more. The logic behind make "receiver.count" configurable is that, since we allow user to configure spout/bolt executor count per worker, we should also allow user to configure the receiver thread count to make it consistent with the parallism settings. Besides, increasing worker count will, 1. add more cross process or cross machine communication. For tasks inside same worker process, the message will be * local * dispatched to target task if possible. 2. More Netty context switch and contention. Check http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty 3. More outbound acker message count. Usually we will allocate one acker to one worker. So in common practice, each worker will have a moderate size of executors, neither too small, nor too big.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12845130

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -84,43 +93,87 @@

          // Start the connection attempt.
          remote_addr = new InetSocketAddress(host, port);

          • bootstrap.connect(remote_addr);
            +
            + Thread flushChecker = new Thread(new Runnable() {
              • End diff –

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12845130 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -84,43 +93,87 @@ // Start the connection attempt. remote_addr = new InetSocketAddress(host, port); bootstrap.connect(remote_addr); + + Thread flushChecker = new Thread(new Runnable() { End diff – fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12845167

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers,
          ret.start();
          return ret;
          }
          +
          + public static void redirectStreamAsync(Process process) {
          + redirectStreamAsync(process.getInputStream(), System.out);
          — End diff –

          Fixed, this part code is removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12845167 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -373,6 +399,25 @@ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, ret.start(); return ret; } + + public static void redirectStreamAsync(Process process) { + redirectStreamAsync(process.getInputStream(), System.out); — End diff – Fixed, this part code is removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12845322

          — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java —
          @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String
          }

          public static Integer getInt(Object o) {

          • if(o instanceof Long) { - return ((Long) o ).intValue(); - }

            else if (o instanceof Integer)

            { - return (Integer) o; - }

            else if (o instanceof Short)

            { - return ((Short) o).intValue(); - }

            else

            { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - }

            + Integer result = getInt(o, null);
            + if (null == result)

            { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + }

            + return result;
            + }
            +
            + public static Integer getInt(Object o, Integer defaultValue) {
            + if (null == o)

            { + return defaultValue; + }

            +
            + if(o instanceof Long) {

              • End diff –

          You are correct, I filed

          https://issues.apache.org/jira/browse/STORM-328

          To address the issue across the project.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12845322 — Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java — @@ -301,15 +303,39 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static Integer getInt(Object o) { if(o instanceof Long) { - return ((Long) o ).intValue(); - } else if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof Short) { - return ((Short) o).intValue(); - } else { - throw new IllegalArgumentException("Don't know how to convert " + o + " + to int"); - } + Integer result = getInt(o, null); + if (null == result) { + throw new IllegalArgumentException("Don't know how to convert null + to int"); + } + return result; + } + + public static Integer getInt(Object o, Integer defaultValue) { + if (null == o) { + return defaultValue; + } + + if(o instanceof Long) { End diff – You are correct, I filed https://issues.apache.org/jira/browse/STORM-328 To address the issue across the project.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12845484

          — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj —
          @@ -109,25 +111,30 @@
          (defn mk-transfer-fn [worker]
          (let [local-tasks (-> worker :task-ids set)
          local-transfer (:transfer-local-fn worker)

          • ^DisruptorQueue transfer-queue (:transfer-queue worker)]
            + ^DisruptorQueue transfer-queue (:transfer-queue worker)
            + task->node+port (:cached-task->node+port worker)]
            (fn [^KryoTupleSerializer serializer tuple-batch]
            (let [local (ArrayList.)
          • remote (ArrayList.)]
            + remoteMap (HashMap.)]
            (fast-list-iter [[task tuple :as pair] tuple-batch]
            (if (local-tasks task)
            (.add local pair)
          • (.add remote pair)
          • ))
            + (let [node+port (get @task->node+port task)]
            + (when (not (.get remoteMap node+port))
            + (.put remoteMap node+port (ArrayList.)))
            + (let [remote (.get remoteMap node+port)]
            + (.add remote (TaskMessage. task (.serialize serializer tuple)))
            + ))))
              • End diff –

          OK, if you could add a quick comment about it, that would help prevent someone like me trying to "clean it up" in the future and slowing it down. Something like
          ```;;Using java objects directly to avoid performance issues in java code```

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12845484 — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj — @@ -109,25 +111,30 @@ (defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] + ^DisruptorQueue transfer-queue (:transfer-queue worker) + task->node+port (:cached-task->node+port worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] + remoteMap (HashMap.)] (fast-list-iter [ [task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) + (let [node+port (get @task->node+port task)] + (when (not (.get remoteMap node+port)) + (.put remoteMap node+port (ArrayList.))) + (let [remote (.get remoteMap node+port)] + (.add remote (TaskMessage. task (.serialize serializer tuple))) + )))) End diff – OK, if you could add a quick comment about it, that would help prevent someone like me trying to "clean it up" in the future and slowing it down. Something like ```;;Using java objects directly to avoid performance issues in java code```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r12845868

          — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj —
          @@ -109,25 +111,30 @@
          (defn mk-transfer-fn [worker]
          (let [local-tasks (-> worker :task-ids set)
          local-transfer (:transfer-local-fn worker)

          • ^DisruptorQueue transfer-queue (:transfer-queue worker)]
            + ^DisruptorQueue transfer-queue (:transfer-queue worker)
            + task->node+port (:cached-task->node+port worker)]
            (fn [^KryoTupleSerializer serializer tuple-batch]
            (let [local (ArrayList.)
          • remote (ArrayList.)]
            + remoteMap (HashMap.)]
            (fast-list-iter [[task tuple :as pair] tuple-batch]
            (if (local-tasks task)
            (.add local pair)
          • (.add remote pair)
          • ))
            + (let [node+port (get @task->node+port task)]
            + (when (not (.get remoteMap node+port))
            + (.put remoteMap node+port (ArrayList.)))
            + (let [remote (.get remoteMap node+port)]
            + (.add remote (TaskMessage. task (.serialize serializer tuple)))
            + ))))
              • End diff –

          fixed, thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r12845868 — Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj — @@ -109,25 +111,30 @@ (defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] + ^DisruptorQueue transfer-queue (:transfer-queue worker) + task->node+port (:cached-task->node+port worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] + remoteMap (HashMap.)] (fast-list-iter [ [task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) + (let [node+port (get @task->node+port task)] + (when (not (.get remoteMap node+port)) + (.put remoteMap node+port (ArrayList.))) + (let [remote (.get remoteMap node+port)] + (.add remote (TaskMessage. task (.serialize serializer tuple))) + )))) End diff – fixed, thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986

          @clockfly,

          Your logic makes since to me on why these calls are blocking. My biggest concern around the blocking is in the case of a worker crashing. If a single worker crashes this can block the entire topology from executing until that worker comes back up. In some cases I can see that being something that you would want. In other cases I can see speed being the primary concern and some users would like to get partial data fast, rather then accurate data later.

          Could we make it configurable on a follow up JIRA where we can have a max limit to the buffering that is allowed, before we block, or throw data away (which is what zeromq does)?

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986 @clockfly, Your logic makes since to me on why these calls are blocking. My biggest concern around the blocking is in the case of a worker crashing. If a single worker crashes this can block the entire topology from executing until that worker comes back up. In some cases I can see that being something that you would want. In other cases I can see speed being the primary concern and some users would like to get partial data fast, rather then accurate data later. Could we make it configurable on a follow up JIRA where we can have a max limit to the buffering that is allowed, before we block, or throw data away (which is what zeromq does)?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43634626

          Bobby,

          Your suggestion makes sense, let's do this in a follow up jira!

          Sean

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43634626 Bobby, Your suggestion makes sense, let's do this in a follow up jira! Sean
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43636831

          Running the unit tests I ran into some errors in backtype.storm.messaging.netty-unit-test (there were 3 failures like this one)

          It looks like you need to update the test to set Config.TOPOLOGY_WORKERS before creating the context.

          ```
          expected: nil
          actual: java.lang.RuntimeException: Fail to construct messaging plugin from plugin backtype.storm.messaging.netty.Context
          at backtype.storm.messaging.TransportFactory.makeContext (TransportFactory.java:53)
          backtype.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:109)
          clojure.test$test_var$fn__6926.invoke (test.clj:701)
          clojure.test$test_var.invoke (test.clj:701)
          clojure.test$test_all_vars$fn_6930$fn_6937.invoke (test.clj:717)
          clojure.test$default_fixture.invoke (test.clj:671)
          clojure.test$test_all_vars$fn__6930.invoke (test.clj:717)
          ...
          Caused by: java.lang.IllegalArgumentException: Don't know how to convert null + to int
          at backtype.storm.utils.Utils.getInt (Utils.java:308)
          backtype.storm.messaging.netty.Context.prepare (Context.java:66)
          backtype.storm.messaging.TransportFactory.makeContext (TransportFactory.java:45)
          backtype.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:109)
          clojure.test$test_var$fn__6926.invoke (test.clj:701)
          clojure.test$test_var.invoke (test.clj:701)
          clojure.test$test_all_vars$fn_6930$fn_6937.invoke (test.clj:717)
          clojure.test$default_fixture.invoke (test.clj:671)
          clojure.test$test_all_vars$fn__6930.invoke (test.clj:717)
          clojure.test$default_fixture.invoke (test.clj:671)
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43636831 Running the unit tests I ran into some errors in backtype.storm.messaging.netty-unit-test (there were 3 failures like this one) It looks like you need to update the test to set Config.TOPOLOGY_WORKERS before creating the context. ``` expected: nil actual: java.lang.RuntimeException: Fail to construct messaging plugin from plugin backtype.storm.messaging.netty.Context at backtype.storm.messaging.TransportFactory.makeContext (TransportFactory.java:53) backtype.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:109) clojure.test$test_var$fn__6926.invoke (test.clj:701) clojure.test$test_var.invoke (test.clj:701) clojure.test$test_all_vars$fn_ 6930$fn _6937.invoke (test.clj:717) clojure.test$default_fixture.invoke (test.clj:671) clojure.test$test_all_vars$fn__6930.invoke (test.clj:717) ... Caused by: java.lang.IllegalArgumentException: Don't know how to convert null + to int at backtype.storm.utils.Utils.getInt (Utils.java:308) backtype.storm.messaging.netty.Context.prepare (Context.java:66) backtype.storm.messaging.TransportFactory.makeContext (TransportFactory.java:45) backtype.storm.messaging.netty_unit_test/fn (netty_unit_test.clj:109) clojure.test$test_var$fn__6926.invoke (test.clj:701) clojure.test$test_var.invoke (test.clj:701) clojure.test$test_all_vars$fn_ 6930$fn _6937.invoke (test.clj:717) clojure.test$default_fixture.invoke (test.clj:671) clojure.test$test_all_vars$fn__6930.invoke (test.clj:717) clojure.test$default_fixture.invoke (test.clj:671) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43639415

          "So in common practice, each worker will have a moderate size of executors, neither too small, nor too big."
          I agree on this. But what size is considered to be too big or too small. Should 36 be too big? Only a few executors failed to heartbeat to nimbus, the whole worker will reload.

          Besides, By using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check [jira][storm-12]. So, increasing worker count may not cause netty context switching problem.

          "3. More outbound acker message count. Usually we will allocate one acker to one worker."
          But you allocate 48 ackers to only 4 workers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43639415 "So in common practice, each worker will have a moderate size of executors, neither too small, nor too big." I agree on this. But what size is considered to be too big or too small. Should 36 be too big? Only a few executors failed to heartbeat to nimbus, the whole worker will reload. Besides, By using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check [jira] [storm-12] . So, increasing worker count may not cause netty context switching problem. "3. More outbound acker message count. Usually we will allocate one acker to one worker." But you allocate 48 ackers to only 4 workers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43656390

          Gvain,

          > Besides, By using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check [jira][storm-12]. So, increasing worker count may not cause netty context switching problem.

          Context switch here means netty threads from different worker processes of same machine will compete with each other.

          > "3. More outbound acker message count. Usually we will allocate one acker to one worker."
          But you allocate 48 ackers to only 4 workers.

          Usually one acker one worker will suffice. But for the perfomance benchmarking case, acker becomes a bottleneck, because the message count is huge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43656390 Gvain, > Besides, By using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check [jira] [storm-12] . So, increasing worker count may not cause netty context switching problem. Context switch here means netty threads from different worker processes of same machine will compete with each other. > "3. More outbound acker message count. Usually we will allocate one acker to one worker." But you allocate 48 ackers to only 4 workers. Usually one acker one worker will suffice. But for the perfomance benchmarking case, acker becomes a bottleneck, because the message count is huge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43656469

          Bobby,

          Sorry for this regression. Now it is fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43656469 Bobby, Sorry for this regression. Now it is fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43659441

          The unit tests pass and the code looks OK to me. I am +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43659441 The unit tests pass and the code looks OK to me. I am +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43703379

          Thanks.

          @miguno, @ptgoetz, @nathanmarz, do you have further comments?

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43703379 Thanks. @miguno, @ptgoetz, @nathanmarz, do you have further comments?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43704046

          @revans,

          I created a jira https://issues.apache.org/jira/browse/STORM-329 to handle the case that worker sudden crashs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43704046 @revans, I created a jira https://issues.apache.org/jira/browse/STORM-329 to handle the case that worker sudden crashs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43708724

          clockfly,

          Thanks for your patience.
          Do you mean that allocating one worker per node is better than several workers per node as the netty threads from different worker process will compete with each other ?
          In production practice, it is properly to allocate only one worker per node ? I don't think so

          And i think using several netty threads working in sync and non-batch mode may have some what the same effect with using only one netty threads working in async and batch mode. Maybe i should test this out. By the way, what storm version do you use in the test ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43708724 clockfly, Thanks for your patience. Do you mean that allocating one worker per node is better than several workers per node as the netty threads from different worker process will compete with each other ? In production practice, it is properly to allocate only one worker per node ? I don't think so And i think using several netty threads working in sync and non-batch mode may have some what the same effect with using only one netty threads working in async and batch mode. Maybe i should test this out. By the way, what storm version do you use in the test ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43716283

          > Do you mean that allocating one worker per node is better than several workers per node as the netty threads from different worker process will compete with each other ?

          It depends. Executor is the basic unit of parallism, suppose executor number is the same, adding a worker will add more traffic, but if it is co-located with other worker, it may also increase the bandwith of intra-worker communication(as we do intra-process communication instead of intra-machine).

          For example, 1, 1, 1 (means 3 machine, one worker on each) may be faster than co-located 4, 4, 4
          But co-located 3, 0, 0 may be faster than distributed 1, 1, 1. This requires experiment to verify it is better or worse.

          >And i think using several netty threads working in sync and non-batch mode may have some what the same effect with using only one netty threads working in async and batch mode. Maybe i should test this out. By the way, what storm version do you use in the test ?

          Yes, sync-async can only give us pointers about what could be the possible bottleneck, these need continous profile-tune experiments to prove our guess. By the way, I use storm-0.9 release version.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43716283 > Do you mean that allocating one worker per node is better than several workers per node as the netty threads from different worker process will compete with each other ? It depends. Executor is the basic unit of parallism, suppose executor number is the same, adding a worker will add more traffic, but if it is co-located with other worker, it may also increase the bandwith of intra-worker communication(as we do intra-process communication instead of intra-machine). For example, 1, 1, 1 (means 3 machine, one worker on each) may be faster than co-located 4, 4, 4 But co-located 3, 0, 0 may be faster than distributed 1, 1, 1. This requires experiment to verify it is better or worse. >And i think using several netty threads working in sync and non-batch mode may have some what the same effect with using only one netty threads working in async and batch mode. Maybe i should test this out. By the way, what storm version do you use in the test ? Yes, sync-async can only give us pointers about what could be the possible bottleneck, these need continous profile-tune experiments to prove our guess. By the way, I use storm-0.9 release version.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43791269

          Yes getting the best performance of a topology really depends on the resources that your topology is using. If your topology is CPU bound then you want to spread it out so that you have enough cores to handle the parallelism, but if your topology is I/O bound you want to collocate them as much as possible. The best performance optimization is simply to stop doing something. So if you can cut out the serialization/deserialization and sending tuples to another process, even over the loopback device, then that potentially becomes a big win.

          The really difficult part is that parts of your topology may be CPU bound, other parts may be I/O bound, and other parts may be constrained by memory (which has it's own limitations). Also you may have a different definition of "best". Some users may require a very low latency, and are willing to let most of the cluster sit idle so that they know when something happens they can process it very quickly. Other times you are willing to sacrifice latency to be sure that everything you want to run fits on a smaller set of hardware.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43791269 Yes getting the best performance of a topology really depends on the resources that your topology is using. If your topology is CPU bound then you want to spread it out so that you have enough cores to handle the parallelism, but if your topology is I/O bound you want to collocate them as much as possible. The best performance optimization is simply to stop doing something. So if you can cut out the serialization/deserialization and sending tuples to another process, even over the loopback device, then that potentially becomes a big win. The really difficult part is that parts of your topology may be CPU bound, other parts may be I/O bound, and other parts may be constrained by memory (which has it's own limitations). Also you may have a different definition of "best". Some users may require a very low latency, and are willing to let most of the cluster sit idle so that they know when something happens they can process it very quickly. Other times you are willing to sacrifice latency to be sure that everything you want to run fits on a smaller set of hardware.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user miguno commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43896389

          @clockfly Many thanks for your all the hard work and your patience. It's much appreciated!

          I don't have anything to add to the discussion at the moment (thanks Bobby & Co.!) except that I, too, can confirm that the Storm test suite passes with Sean's latest code changes. I tested against the latest commit in Sean's `storm_async_netty_and_batch_api` branch, which at the time of writing was https://github.com/clockfly/incubator-storm/commit/20b4f8b2195a1bf214f63e10b1bbca4690c0290f.

          $ git checkout master
          $ gco -b STORM-297
          $ git pull git@github.com:clockfly/incubator-storm.git storm_async_netty_and_batch_api
          $ mvn clean install

          >>> Success.

          PS: Unfortunately I haven't had the chance yet to run the patched version of Storm in a large-scale environment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user miguno commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43896389 @clockfly Many thanks for your all the hard work and your patience. It's much appreciated! I don't have anything to add to the discussion at the moment (thanks Bobby & Co.!) except that I, too, can confirm that the Storm test suite passes with Sean's latest code changes. I tested against the latest commit in Sean's `storm_async_netty_and_batch_api` branch, which at the time of writing was https://github.com/clockfly/incubator-storm/commit/20b4f8b2195a1bf214f63e10b1bbca4690c0290f . $ git checkout master $ gco -b STORM-297 $ git pull git@github.com:clockfly/incubator-storm.git storm_async_netty_and_batch_api $ mvn clean install >>> Success. PS: Unfortunately I haven't had the chance yet to run the patched version of Storm in a large-scale environment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nathanmarz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43917540

          I would like to see a test added that tests that messages are received between tasks in the same order they are sent. The test is likely to be probabilistic, in that if the code is wrong it won't always fail, but that's ok. This is a really important property to maintain that this patch, or future modifications on this code, could affect. So it needs better testing. The test should have many workers with many send/receive threads.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nathanmarz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43917540 I would like to see a test added that tests that messages are received between tasks in the same order they are sent. The test is likely to be probabilistic, in that if the code is wrong it won't always fail, but that's ok. This is a really important property to maintain that this patch, or future modifications on this code, could affect. So it needs better testing. The test should have many workers with many send/receive threads.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-43923944

          Thank you, @miguno!

          @nathanmarz, good point! I will add a test case to cover this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-43923944 Thank you, @miguno! @nathanmarz, good point! I will add a test case to cover this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44372134

          @clockfly

          I used latest storm to reproduce some of your test. I found that adding a few more workers will increase throughput, as well as CPU usage. I also get a max doubled throughput.

          Im my test, i used 4 nodes(24 cores each, not power enough than yours), 48 spouts, 48 bolts, 48 ackers, and 4 to 64 workers, SOL benchmark, message size is 100 Bytes. Here is the results:

          workers | Throughput | CPU usage | NET usage (only IN Bytes)
          ---------------------------------------------------------------------
          4 | 320,000 tps | 56% | 14MB/s
          8 | 656,000 tps | 89% | 28MB/s
          16 | 560,000 tps | 92% | 26MB/s
          32 | 353,000 tps | 90% | 20MB/s
          64 | 208,000 tps | 90% | 16MB/s

          Using 8 workers, the throughput doubled.

          As we already discussed:
          (a) Increasing worker count, not too much, may not cause netty context switching problem. As using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check https://github.com/apache/incubator-storm/pull/57

          (b) Increasing worker count will increase netty threads. Using more netty threads working in sync and non-batch mode may have some what the same effect with using less netty threads working in async and batch mode which is your way.

          From the test result, it seems like the above point of view (b) is true.
          So, maybe, what we need to to is just pulling the https://github.com/apache/incubator-storm/pull/57 and adding a few workers.

          But i am still curious about if i adding more nodes, should this still work ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44372134 @clockfly I used latest storm to reproduce some of your test. I found that adding a few more workers will increase throughput, as well as CPU usage. I also get a max doubled throughput. Im my test, i used 4 nodes(24 cores each, not power enough than yours), 48 spouts, 48 bolts, 48 ackers, and 4 to 64 workers, SOL benchmark, message size is 100 Bytes. Here is the results: workers | Throughput | CPU usage | NET usage (only IN Bytes) --------------------------------------------------------------------- 4 | 320,000 tps | 56% | 14MB/s 8 | 656,000 tps | 89% | 28MB/s 16 | 560,000 tps | 92% | 26MB/s 32 | 353,000 tps | 90% | 20MB/s 64 | 208,000 tps | 90% | 16MB/s Using 8 workers, the throughput doubled. As we already discussed: (a) Increasing worker count, not too much, may not cause netty context switching problem. As using a SHARED threadpool(its default size is 1) among all netty client within a worker, the netty threads number do not increase as total worker numbers increase. Check https://github.com/apache/incubator-storm/pull/57 (b) Increasing worker count will increase netty threads. Using more netty threads working in sync and non-batch mode may have some what the same effect with using less netty threads working in async and batch mode which is your way. From the test result, it seems like the above point of view (b) is true. So, maybe, what we need to to is just pulling the https://github.com/apache/incubator-storm/pull/57 and adding a few workers. But i am still curious about if i adding more nodes, should this still work ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44629767

          @clockfly , @revans2

          I added more nodes, from 4 nodes to 15 nodes, as well as spouts, bolts, ackers and workers, and keep their parallelism count in the same ratio, eg, for every other 4 nodes, add 48 spouts, 48 bolts, 48 ackers and 8 workers. Here is the test result:

          nodes | workers | Throughput | CPU usage | NET usage (only IN Bytes)
          4 | 8 | 656,000 tps | 89% | 28MB/s
          8 | 16 | 1,004,000 tps | 82% | 28MB/s
          12 | 24 | 1,133,000 tps | 72% | 25MB/s
          15 | 30 | 1,235,000 tps | 69% | 24MB/s

          for last two rows, the CPU usage decreased, and the throughput increasing speed slows down. I added a few more worker to re-run the test, the result is:

          nodes | workers | Throughput | CPU usage | NET usage (only IN Bytes)
          12 | 48 | 1,444,000 tps | 88% | 30MB/s
          15 | 30 | 1,735,000 tps | 88% | 30MB/s

          From the result, We can see adding some more workers DO help to scale up performance as adding more CPUs.

          Finally, maybe we should re-considerate the root cause of this issue "STORM-297 Storm Performance cannot be scaled up by adding more CPU cores"

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44629767 @clockfly , @revans2 I added more nodes, from 4 nodes to 15 nodes, as well as spouts, bolts, ackers and workers, and keep their parallelism count in the same ratio, eg, for every other 4 nodes, add 48 spouts, 48 bolts, 48 ackers and 8 workers. Here is the test result: nodes | workers | Throughput | CPU usage | NET usage (only IN Bytes) 4 | 8 | 656,000 tps | 89% | 28MB/s 8 | 16 | 1,004,000 tps | 82% | 28MB/s 12 | 24 | 1,133,000 tps | 72% | 25MB/s 15 | 30 | 1,235,000 tps | 69% | 24MB/s for last two rows, the CPU usage decreased, and the throughput increasing speed slows down. I added a few more worker to re-run the test, the result is: nodes | workers | Throughput | CPU usage | NET usage (only IN Bytes) 12 | 48 | 1,444,000 tps | 88% | 30MB/s 15 | 30 | 1,735,000 tps | 88% | 30MB/s From the result, We can see adding some more workers DO help to scale up performance as adding more CPUs. Finally, maybe we should re-considerate the root cause of this issue " STORM-297 Storm Performance cannot be scaled up by adding more CPU cores"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44634492

          Hi Gvain,

          Firstly I apologize for the late reply, I was stuck by a 2-day training.

          Thank you for sharing your data! It is really awesome that we are using the data to present our points!

          Before jumping to conclusions, let's make sure we understand the data in the same way and the test can be reproduced, I have some questions about your data.

          1. what tps here is consist of? Is it solely consist of spout message? or also including acker message? and other message?
          2. What is the message latency in each test?
          3. I noticed your network usage is pretty small, is it overall cluster network usage? or is it only single machine network usage? Are you using 1Gb network?
          4. What is your max spout pending setting?

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44634492 Hi Gvain, Firstly I apologize for the late reply, I was stuck by a 2-day training. Thank you for sharing your data! It is really awesome that we are using the data to present our points! Before jumping to conclusions, let's make sure we understand the data in the same way and the test can be reproduced, I have some questions about your data. 1. what tps here is consist of? Is it solely consist of spout message? or also including acker message? and other message? 2. What is the message latency in each test? 3. I noticed your network usage is pretty small, is it overall cluster network usage? or is it only single machine network usage? Are you using 1Gb network? 4. What is your max spout pending setting?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44635246

          Hi, clockfly

          1) tps here only consists of spout emitted messages, NOT including acker messages or any other messages.
          3) the network usage is only SINGLE machine network usage, and only the IN direction bytes counts. And I am using two bonded 1Gb network card.
          4) max.spout.pending is 1000.

          As for 2), i didn't really stats message latency. All i did is compared the emitted count by spouts and the received count by bolts, they are quite catch up with each other

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44635246 Hi, clockfly 1) tps here only consists of spout emitted messages, NOT including acker messages or any other messages. 3) the network usage is only SINGLE machine network usage, and only the IN direction bytes counts. And I am using two bonded 1Gb network card. 4) max.spout.pending is 1000. As for 2), i didn't really stats message latency. All i did is compared the emitted count by spouts and the received count by bolts, they are quite catch up with each other
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44642174

          Hi, clockfly

          You can reproduce my test by checking out the modified storm-perf-test here
          https://github.com/Gvain/storm-perf-test/tree/spout-throughput
          and the storm here
          https://github.com/Gvain/incubator-storm, which is merely a copy from the original apache/incubator-storm several days ago.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44642174 Hi, clockfly You can reproduce my test by checking out the modified storm-perf-test here https://github.com/Gvain/storm-perf-test/tree/spout-throughput and the storm here https://github.com/Gvain/incubator-storm , which is merely a copy from the original apache/incubator-storm several days ago.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user miofthena commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44900176

          Hi, @clockfly
          In Client.java

          • submit flusher every ~10ms
          • every flusher never stops (if closed == true)
          • in flusher loop we don't park thread

          → 100% CPU, uncool =(

          Show
          githubbot ASF GitHub Bot added a comment - Github user miofthena commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44900176 Hi, @clockfly In Client.java submit flusher every ~10ms every flusher never stops (if closed == true) in flusher loop we don't park thread → 100% CPU, uncool =(
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-44925955

          Thanks @miofthena,

          fixed at 0bca173! I should have done more test on new checkin.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-44925955 Thanks @miofthena, fixed at 0bca173! I should have done more test on new checkin.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45046513

          @Gvain, thanks for your detailed description. I care nothing but finding the truth. :Let me try your approach to reproduce the test.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45046513 @Gvain, thanks for your detailed description. I care nothing but finding the truth. :Let me try your approach to reproduce the test.
          Hide
          clockfly Sean Zhong added a comment -

          worker scability test without storm-297

          Show
          clockfly Sean Zhong added a comment - worker scability test without storm-297
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45050542

          @clockfly , Just let you know that I keep the topology.executor.send.buffer.size and the topology.executor.receive.buffer.size unchanged which are both 1024. And the netty.server/client.worker_threads are both set to 1, and storm.messaging.netty.buffer_size is set to 5242880

          I think this discuss will help us understand the issue much better.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45050542 @clockfly , Just let you know that I keep the topology.executor.send.buffer.size and the topology.executor.receive.buffer.size unchanged which are both 1024. And the netty.server/client.worker_threads are both set to 1, and storm.messaging.netty.buffer_size is set to 5242880 I think this discuss will help us understand the issue much better.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45052822

          Hi @Gvain,

          First thank you for your insistence! it really helps to gain more understanding of storm.
          I run with your approach, the result verified your saying, the throughput does increase as worker number increase.

          Test
          -------------

          *Throughput vs. worker# ①*

          Worker# cluster network IN(MB/s) spout throughput(msg/s) overall CPU user cpu system cpu total threads#②
          ------------- :-------------: -----: -----: -----: -----: -----: -----:
          4 72 391860 42% 36% 6% 422
          8 88.3 475007 42% 35% 7% 554
          12 104 555174 51% 40% 11% 686
          16 116 603399 59% 46% 13% 818
          24 130 622938 77% 55% 22% 1082
          4(storm 297) 140 752479 74% 67.80% 5.20% 434

          ① Test environment: node=4, 48 vcore on each machine, max.spout.pendings = 1000, CPU: E52680, 48 spout, 48 bolt, and 48 ackers.
          ② We will only count 1 gc thread 1 jit thread for each JVM.

          ![worker_throughput_without_storm-297](https://cloud.githubusercontent.com/assets/2595532/3169502/82447702-eb9e-11e3-9f97-bc29fde190f3.png)

          *CPU Usage*

          ![cpu_worker _scability](https://cloud.githubusercontent.com/assets/2595532/3169503/afdac11c-eb9e-11e3-9207-9d524606f613.png)

          We can find out both throughput and CPU usage increase when worker number increase.

          Analysis
          -------------------
          The facts revealed by this test strengthened my conviction that it is even better to apply this patch for higher performance:

          1. *Higher performance with this patch.*

            Worker# cluster network IN(MB/s) spout through put(msg/s)
          ------------- :-------------: -----: -----: -----:
          no storm-297 24 130 622938
          with storm 297 4 140 752479

          *4 worker* with storm-297 can process *20% more* message than *24 workers* without this patch, with *less* CPU consumption.

          2. *Storm cannot scale well by changing task parallism solely*

          As the data in your test showed, for 4 worker, we can only reach 56% CPU. with the facts that there are 36 task parallism for each worker, much lagger than CPU core# 24.

          *The worker has inherit bottlenecks, ths issues are there. Work-arounds won't fix those issues.*

          3. *CPU System time increase when worker number increase*

          Worker# user cpu system cpu
          ------------- -----: -----:
          4 36% 6%
          8 35% 7%
          12 40% 11%
          16 46% 13%
          24 55% *22%*
          4(with storm-297) 67.80% 5.20%

          Unnormal high system CPU is not good.

          4. *JVM allocation cost is high.*
          In the test, there are 16+ threads running for each JVM allocation. The supporting data can be found at

          5. *Worker allocation is not cost free*

          Besides the JVM allocation cost, there are zookeeper related threads(4), hearbeat threads(5), system bolt(2), netty boss and worker threads(6, 2 boss, 2 worker, and 2 timer). Plus with JVM threads, each worker will add at least 33 threads.

          More threads will add more pressure to central service like nimbus, and zookeeper.

          More threads means more context switch, it will hurt the performance of all applications running on these server.

          Worker# cluster total threads#
          ------------- :-------------:
          4 422
          8 554
          12 686
          16 818
          24 *1082*
          4(with storm-297) 434

          (We will only count 1 gc thread 1 jit thread for each JVM)

          For 24 workers case, the overall thread number is 1082+, with *each machine having more than 270 threads!*

          6. *Serialization and Deserialization Cost*

          When the message is delivered from the task in the same process, the tuple won't be serialized.

          When there are 4 worker(suppose shuffle grouping and task is even distributed), there are 1/4 message that don't need serialization, but for 24 workers, the ratio is 1/24, which means we are now need to serialize 28% more messages.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45052822 Hi @Gvain, First thank you for your insistence! it really helps to gain more understanding of storm. I run with your approach, the result verified your saying, the throughput does increase as worker number increase. Test ------------- * Throughput vs. worker# ① * Worker# cluster network IN(MB/s) spout throughput(msg/s) overall CPU user cpu system cpu total threads#② ------------- :-------------: -----: -----: -----: -----: -----: -----: 4 72 391860 42% 36% 6% 422 8 88.3 475007 42% 35% 7% 554 12 104 555174 51% 40% 11% 686 16 116 603399 59% 46% 13% 818 24 130 622938 77% 55% 22% 1082 4(storm 297) 140 752479 74% 67.80% 5.20% 434 ① Test environment: node=4, 48 vcore on each machine, max.spout.pendings = 1000, CPU: E52680, 48 spout, 48 bolt, and 48 ackers. ② We will only count 1 gc thread 1 jit thread for each JVM. ! [worker_throughput_without_storm-297] ( https://cloud.githubusercontent.com/assets/2595532/3169502/82447702-eb9e-11e3-9f97-bc29fde190f3.png ) * CPU Usage * ! [cpu_worker _scability] ( https://cloud.githubusercontent.com/assets/2595532/3169503/afdac11c-eb9e-11e3-9207-9d524606f613.png ) We can find out both throughput and CPU usage increase when worker number increase. Analysis ------------------- The facts revealed by this test strengthened my conviction that it is even better to apply this patch for higher performance: 1. * Higher performance with this patch. *   Worker# cluster network IN(MB/s) spout through put(msg/s) ------------- :-------------: -----: -----: -----: no storm-297 24 130 622938 with storm 297 4 140 752479 * 4 worker * with storm-297 can process * 20% more * message than * 24 workers * without this patch, with * less * CPU consumption. 2. * Storm cannot scale well by changing task parallism solely * As the data in your test showed, for 4 worker, we can only reach 56% CPU. with the facts that there are 36 task parallism for each worker, much lagger than CPU core# 24. * The worker has inherit bottlenecks, ths issues are there. Work-arounds won't fix those issues. * 3. * CPU System time increase when worker number increase * Worker# user cpu system cpu ------------- -----: -----: 4 36% 6% 8 35% 7% 12 40% 11% 16 46% 13% 24 55% * 22% * 4(with storm-297) 67.80% 5.20% Unnormal high system CPU is not good. 4. * JVM allocation cost is high. * In the test, there are 16+ threads running for each JVM allocation. The supporting data can be found at 5. * Worker allocation is not cost free * Besides the JVM allocation cost, there are zookeeper related threads(4), hearbeat threads(5), system bolt(2), netty boss and worker threads(6, 2 boss, 2 worker, and 2 timer). Plus with JVM threads, each worker will add at least 33 threads. More threads will add more pressure to central service like nimbus, and zookeeper. More threads means more context switch, it will hurt the performance of all applications running on these server. Worker# cluster total threads# ------------- :-------------: 4 422 8 554 12 686 16 818 24 * 1082 * 4(with storm-297) 434 (We will only count 1 gc thread 1 jit thread for each JVM) For 24 workers case, the overall thread number is 1082+, with * each machine having more than 270 threads! * 6. * Serialization and Deserialization Cost * When the message is delivered from the task in the same process, the tuple won't be serialized. When there are 4 worker(suppose shuffle grouping and task is even distributed), there are 1/4 message that don't need serialization, but for 24 workers, the ratio is 1/24, which means we are now need to serialize 28% more messages.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45053277

          Test case for message delivery order is added at 426d143

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45053277 Test case for message delivery order is added at 426d143
          Hide
          clockfly Sean Zhong added a comment -

          general configuration used for this test.

          Show
          clockfly Sean Zhong added a comment - general configuration used for this test.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nathanmarz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45056169

          Great work @clockfly. The only additional change I'd like to see is renaming "worker.receiver.thread.count" to "topology.worker.receiver.thread.count". The "topology" prefix is how we indicate confs can be set on a topology-specific level. After that I'm +1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nathanmarz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45056169 Great work @clockfly. The only additional change I'd like to see is renaming "worker.receiver.thread.count" to "topology.worker.receiver.thread.count". The "topology" prefix is how we indicate confs can be set on a topology-specific level. After that I'm +1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45056355

          @clockfly

          Thanks for your detailed test. But notice that there is a big difference between our test results.
          In my test, only using 8 workers, i get the throughput peak which is 656K tps, and adding more workers the throughput drops down.
          In your test, adding more worker helps increasing throughput, and the throughput peak reached 622K tps when using 24 workers. The worker number is greatly increased and thus the total threads number.

          I am wondering why this difference exists.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45056355 @clockfly Thanks for your detailed test. But notice that there is a big difference between our test results. In my test, only using 8 workers, i get the throughput peak which is 656K tps, and adding more workers the throughput drops down. In your test, adding more worker helps increasing throughput, and the throughput peak reached 622K tps when using 24 workers. The worker number is greatly increased and thus the total threads number. I am wondering why this difference exists.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user miguno commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45060188

          @clockfly Beyond the specific work on this item (STORM-297) what would be your lessons learned after having investigated Storm's performance and scalability so closely, i.e. with regards to potential bottlenecks (= things that are broken and that need fixing) or areas where we still have untapped potential (= where we could optimize further, maybe simply because we haven't focused on a particular piece of the architecture/code/... yet)?

          For instance, would you say that there are some general design-related decisions that could put an upper bound on Storm's scalability (in theory and/or in practice)? Or would you say it is, at least at this point in time, more the current implementation of the design? Just thinking aloud.

          I'd appreciate any comments – positive or negative – you might have in this context.

          Show
          githubbot ASF GitHub Bot added a comment - Github user miguno commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45060188 @clockfly Beyond the specific work on this item ( STORM-297 ) what would be your lessons learned after having investigated Storm's performance and scalability so closely, i.e. with regards to potential bottlenecks (= things that are broken and that need fixing) or areas where we still have untapped potential (= where we could optimize further, maybe simply because we haven't focused on a particular piece of the architecture/code/... yet)? For instance, would you say that there are some general design-related decisions that could put an upper bound on Storm's scalability (in theory and/or in practice)? Or would you say it is, at least at this point in time, more the current implementation of the design? Just thinking aloud. I'd appreciate any comments – positive or negative – you might have in this context.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45072821

          @Gvain,

          There are two parts in your question,

          1. In your test, why throughput drops when worker number increase after reaching a value(8 in your test case)?

          For this one, it is because your CPU reach limit for worker# = 8 (CPU usage: 89%), In this case, adding more workers will just adding more threads and context switch, hurting performance. While for my case, I have more powerful CPU, and allow more parallel workers.

          2. why there are performance difference when scaling worker# from 4 to 8, in two different environment?

          I don't know the answer. But I guess it may be caused by the difference in hardware. You env is "bonded 1Gb network card"(2Gb) bandwith is twice mine, and CPU is 24 core, half of mine.

          Suppose we can model the message transfering pipeline as three layers:

          netty layer(throughput somewhat impacted by NIC bandwidth) > intermediate layer( worker intermediate receiving pipes: netty server handler -> decoding> receiver thread ) -> task processing (througput impact by CPU).

          For your env, CPU is relative at shortage, effective network bandwidth is rich(effective bandwith is measured by theory_bandwidth * network_efficiency_factor), the performance is throttled by the last layer. While for my environment, CPU is rich, effective network bandwidth is not enough(due to theory_bandwidth is only half), the performance is throttled by the first two layers.

          The patch mainly solved the first two layers.

          1. Change netty Api from async -> sync and messaging API change will improve the network_efficiency_factor, thus increasing the effective network bandwidth.

          2. Adding more receiver thread and optimization in netty server handler will improve the second layer throughput.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45072821 @Gvain, There are two parts in your question, 1. In your test, why throughput drops when worker number increase after reaching a value(8 in your test case)? For this one, it is because your CPU reach limit for worker# = 8 (CPU usage: 89%), In this case, adding more workers will just adding more threads and context switch, hurting performance. While for my case, I have more powerful CPU, and allow more parallel workers. 2. why there are performance difference when scaling worker# from 4 to 8, in two different environment? I don't know the answer. But I guess it may be caused by the difference in hardware. You env is "bonded 1Gb network card"(2Gb) bandwith is twice mine, and CPU is 24 core, half of mine. Suppose we can model the message transfering pipeline as three layers: netty layer(throughput somewhat impacted by NIC bandwidth) > intermediate layer( worker intermediate receiving pipes: netty server handler -> decoding > receiver thread ) -> task processing (througput impact by CPU). For your env, CPU is relative at shortage, effective network bandwidth is rich(effective bandwith is measured by theory_bandwidth * network_efficiency_factor), the performance is throttled by the last layer. While for my environment, CPU is rich, effective network bandwidth is not enough(due to theory_bandwidth is only half), the performance is throttled by the first two layers. The patch mainly solved the first two layers. 1. Change netty Api from async -> sync and messaging API change will improve the network_efficiency_factor, thus increasing the effective network bandwidth. 2. Adding more receiver thread and optimization in netty server handler will improve the second layer throughput.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45091025

          @clockfly

          Thanks for your explanation. The explanation for part 2 seems not so convincing as the CPU and network are not saturated at both env when scaling worker# from 4 to 8. Oops, I don't wanna go any further on this.

          Anyway, your work is really great.

          Another thing. Since you use netty to send message in a async way, how can you ensure that all messages emitted from a fast component are received by a slow component without any loss especially under heavy throughput without any ackers enabled ? To my knowledge, the netty channel may not be writable, and pendings count will increase.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45091025 @clockfly Thanks for your explanation. The explanation for part 2 seems not so convincing as the CPU and network are not saturated at both env when scaling worker# from 4 to 8. Oops, I don't wanna go any further on this. Anyway, your work is really great. Another thing. Since you use netty to send message in a async way, how can you ensure that all messages emitted from a fast component are received by a slow component without any loss especially under heavy throughput without any ackers enabled ? To my knowledge, the netty channel may not be writable, and pendings count will increase.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r13386178

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +185,114 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
            -
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) {
            + MessageBatch toBeFlushed = messageBatch;
            + flushRequest(channel, toBeFlushed);
              • End diff –

          What if channel is not writable ? Furthermore, what if messageBatch continuous being filled up under heavy throughput while channel is still not writable as connected peer is a slow component ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r13386178 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +185,114 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } - /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed); End diff – What if channel is not writable ? Furthermore, what if messageBatch continuous being filled up under heavy throughput while channel is still not writable as connected peer is a slow component ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45226047

          @miguno, there are several more observations I have

          1. Network still not efficient enough

          We can see from the test report, after this fix, the throughput is still bottlenecked by network(CPU: 72%, network: 45%), because there are still margins in the CPU(28%). That’s weird because only 45% of theory network bandwidth is used.

          2. Uneven machine message receive latency

          In the experiment, I noticed that there are always some machine whose message receive latency is much higher than the others. For example, tuples generated from machine A, are sent to tasks on machine B, C, D, in one run, tasks on B take more time to receive messages, in another run, D may be the slowest.

          My guess is that some machine has a longer netty receiver queue than the other machines, and the queue length on all machines becomes stable but not equal after some time(new input = new output) . The latency is different because queue length is different. Changing max.spout.pending won’t improve this, because it only control overall message sent from A, it doesn’t treat B, C, D differently.

          3. better max.spout.pending?
          I observed, after we tune max.spout.pending to a big enough value, increasing max.spout.pending will only add to latency but not throughput. When spout.pending doubles, the latency doubles.

          Can we do flow control adaptively so that we stops when there is no further benefit to continue increasing max.spout.pending?

          4. Potential deadlock when all intermediate buffer is full

          Consider two worker, task1(workerA) deliver message to task3(workerB), task3 deliver to task2(workerA). There is a loop! It is possible that all worker sender/receiver buffer will be full and block.

          ![vvvv](https://cloud.githubusercontent.com/assets/2595532/3188775/ba645bdc-ecbd-11e3-959b-dfb8208d4d1b.png)

          The current work-around in storm is tricky, it use a unbounded receiver buffer(LinkedBlockingQueue) for each worker to break the loop. But this is not good, because the receiver buffer can potentially be very long, and latency be very high.

          5. Is it necessary for each task to have a dedicated send queue thread?
          Currently, each task has a dedicated send queue thread to push data to worker transfer queue. During the profiling, the task send queue thread is usually at wait state. Maybe it is a good idea to use a shared thread pool replace dedicated thread?

          6. Acker workload very high.
          In the test, I spotted that the acker task is very busy. As each message size is small(100 byte), there are hugh amout of tuples need to be acked.

          Can this acker cost be reduced?

          For example, we can group the tuple at spout to time slice, and each time slice will share a same root tuple Id. For example, the time slice can be 100ms, and there are 10, 000 message in this slice, all share same root id, before sending to acker task, we can first XOR all acker message of same root Id locally on each worker. In that case, we may can reduce the acking network and task cost. The drawback is that when a message is lost, we need to reply all message in this slice.

          7. Worker receive thread blocked by task receiver queue

          In worker receiver thread, it will try to publish the messages to the receive queue of each task sequentially in a blocking way. If one task receiver queue is full, the thread will block and wait.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45226047 @miguno, there are several more observations I have 1. Network still not efficient enough We can see from the test report, after this fix, the throughput is still bottlenecked by network(CPU: 72%, network: 45%), because there are still margins in the CPU(28%). That’s weird because only 45% of theory network bandwidth is used. 2. Uneven machine message receive latency In the experiment, I noticed that there are always some machine whose message receive latency is much higher than the others. For example, tuples generated from machine A, are sent to tasks on machine B, C, D, in one run, tasks on B take more time to receive messages, in another run, D may be the slowest. My guess is that some machine has a longer netty receiver queue than the other machines, and the queue length on all machines becomes stable but not equal after some time(new input = new output) . The latency is different because queue length is different. Changing max.spout.pending won’t improve this, because it only control overall message sent from A, it doesn’t treat B, C, D differently. 3. better max.spout.pending? I observed, after we tune max.spout.pending to a big enough value, increasing max.spout.pending will only add to latency but not throughput. When spout.pending doubles, the latency doubles. Can we do flow control adaptively so that we stops when there is no further benefit to continue increasing max.spout.pending? 4. Potential deadlock when all intermediate buffer is full Consider two worker, task1(workerA) deliver message to task3(workerB), task3 deliver to task2(workerA). There is a loop! It is possible that all worker sender/receiver buffer will be full and block. ! [vvvv] ( https://cloud.githubusercontent.com/assets/2595532/3188775/ba645bdc-ecbd-11e3-959b-dfb8208d4d1b.png ) The current work-around in storm is tricky, it use a unbounded receiver buffer(LinkedBlockingQueue) for each worker to break the loop. But this is not good, because the receiver buffer can potentially be very long, and latency be very high. 5. Is it necessary for each task to have a dedicated send queue thread? Currently, each task has a dedicated send queue thread to push data to worker transfer queue. During the profiling, the task send queue thread is usually at wait state. Maybe it is a good idea to use a shared thread pool replace dedicated thread? 6. Acker workload very high. In the test, I spotted that the acker task is very busy. As each message size is small(100 byte), there are hugh amout of tuples need to be acked. Can this acker cost be reduced? For example, we can group the tuple at spout to time slice, and each time slice will share a same root tuple Id. For example, the time slice can be 100ms, and there are 10, 000 message in this slice, all share same root id, before sending to acker task, we can first XOR all acker message of same root Id locally on each worker. In that case, we may can reduce the acking network and task cost. The drawback is that when a message is lost, we need to reply all message in this slice. 7. Worker receive thread blocked by task receiver queue In worker receiver thread, it will try to publish the messages to the receive queue of each task sequentially in a blocking way. If one task receiver queue is full, the thread will block and wait.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r13443448

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +185,114 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
            -
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) {
            + MessageBatch toBeFlushed = messageBatch;
            + flushRequest(channel, toBeFlushed);
              • End diff –

          @Gvain,

          For an unacked topology, we can either block, drop, or store. before this patch, storm will always store, so there are potential OOM like you described. this patch steps a bit forward, if the downstream machine is down, it will block the upstream from sending thus avoiding OOM.

          But for the case you mentioned, the downsteam is slow, but not down, it is possible that there will be more message pending at the netty client side or server side. If the network is OK, then it will pend on the server side, if the network is overwhelmed, then the message will be pending at the netty client side.

          An example is that the spout crazyly generate really big message, but the bandwidth is not enough, it will lead to OOM on the spout worker JVM, and nothing can stop that.

          Bobby's comment mentioned a similar situation https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986,

          We should allow user to config to drop message in this case, thus avoiding OOM, this bug is filed at followup jira https://issues.apache.org/jira/browse/STORM-329 after discussion with Bobby.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r13443448 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +185,114 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } - /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed); End diff – @Gvain, For an unacked topology, we can either block, drop, or store. before this patch, storm will always store, so there are potential OOM like you described. this patch steps a bit forward, if the downstream machine is down, it will block the upstream from sending thus avoiding OOM. But for the case you mentioned, the downsteam is slow, but not down, it is possible that there will be more message pending at the netty client side or server side. If the network is OK, then it will pend on the server side, if the network is overwhelmed, then the message will be pending at the netty client side. An example is that the spout crazyly generate really big message, but the bandwidth is not enough, it will lead to OOM on the spout worker JVM, and nothing can stop that. Bobby's comment mentioned a similar situation https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986 , We should allow user to config to drop message in this case, thus avoiding OOM, this bug is filed at followup jira https://issues.apache.org/jira/browse/STORM-329 after discussion with Bobby.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r13477759

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +185,114 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
            -
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) {
            + MessageBatch toBeFlushed = messageBatch;
            + flushRequest(channel, toBeFlushed);
              • End diff –

          @clockfly

          Thanks for your remind.
          For an unacked topology. Before your patch, the OOM problem do exists, and I filed it at https://issues.apache.org/jira/browse/STORM-339

          After your patch, this OOM problem seems still exists just as what you said and filed up at jira https://issues.apache.org/jira/browse/STORM-329. Now, we are stuck on this as we are using unacked topology under heavy throughput, so shall we make this issue's priority higher and fix it in time ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r13477759 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +185,114 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } - /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed); End diff – @clockfly Thanks for your remind. For an unacked topology. Before your patch, the OOM problem do exists, and I filed it at https://issues.apache.org/jira/browse/STORM-339 After your patch, this OOM problem seems still exists just as what you said and filed up at jira https://issues.apache.org/jira/browse/STORM-329 . Now, we are stuck on this as we are using unacked topology under heavy throughput, so shall we make this issue's priority higher and fix it in time ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r13480579

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +185,114 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
            -
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) {
            + MessageBatch toBeFlushed = messageBatch;
            + flushRequest(channel, toBeFlushed);
              • End diff –

          @Gvain,

          Unacked topology with no flow control is dangerous!

          There are two concepts, flow control and error control. You don't want acker for its performance penalty, that is no error control. But you do need flow control.

          But the fact is that current storm flow control method "max.spout.pending" depends on error control channel "acker", that is the dilemma. Maybe we should seperate "max.spout.pending" from error control channel "acker" and only serves for flow control.

          One work-around is that when you emit a message with SpoutOutputCollector, you emit a id with a sampling rate, for example 1%.

          collector.emit(tuple, messageId),

          only 1% tuple has messageId, while 99% don't have.

          In this case, you only have 1% acker traffic, reducing the performance penalty of acker, while still have some basic flow control.

          Hope this helps!

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r13480579 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +185,114 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } - /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed); End diff – @Gvain, Unacked topology with no flow control is dangerous! There are two concepts, flow control and error control. You don't want acker for its performance penalty, that is no error control. But you do need flow control. But the fact is that current storm flow control method "max.spout.pending" depends on error control channel "acker", that is the dilemma. Maybe we should seperate "max.spout.pending" from error control channel "acker" and only serves for flow control. One work-around is that when you emit a message with SpoutOutputCollector, you emit a id with a sampling rate, for example 1%. collector.emit(tuple, messageId), only 1% tuple has messageId, while 99% don't have. In this case, you only have 1% acker traffic, reducing the performance penalty of acker, while still have some basic flow control. Hope this helps!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Gvain commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r13485764

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +185,114 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
            -
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) {
            + MessageBatch toBeFlushed = messageBatch;
            + flushRequest(channel, toBeFlushed);
              • End diff –

          @clockfly

          Shall we just use a limited size of message queue in netty-client, if the queue is full, we can block or drop until channel has flushed away some pending messages. The strategy can be configurable just as https://issues.apache.org/jira/browse/STORM-329 described.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Gvain commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r13485764 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +185,114 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } - /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed); End diff – @clockfly Shall we just use a limited size of message queue in netty-client, if the queue is full, we can block or drop until channel has flushed away some pending messages. The strategy can be configurable just as https://issues.apache.org/jira/browse/STORM-329 described.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on a diff in the pull request:

          https://github.com/apache/incubator-storm/pull/103#discussion_r13488844

          — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java —
          @@ -128,133 +185,114 @@ private long getSleepTimeMs()
          }

          /**

          • * Enqueue a task message to be sent to server
            + * Enqueue task messages to be sent to server
            */
          • public void send(int task, byte[] message) {
          • //throw exception if the client is being closed
          • if (being_closed.get()) {
            + synchronized public void send(Iterator<TaskMessage> msgs) {
            +
            + // throw exception if the client is being closed
            + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); }

            -

          • try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - }

            catch (InterruptedException e) {

          • throw new RuntimeException(e);
            +
            + if (null == msgs || !msgs.hasNext()) { + return; }
          • }
            -
          • /**
          • * Retrieve messages from queue, and delivery to server if any
          • */
          • synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
          • //just skip if delivery only if waiting, and we are not waiting currently
          • if (only_if_waiting && !wait_for_requests) return;
          • //make sure that channel was not closed
            Channel channel = channelRef.get();
          • if (channel == null) return;
          • if (!channel.isOpen()) {
          • LOG.info("Channel to {} is no longer open.",remote_addr);
          • //The channel is not open yet. Reconnect?
          • reconnect();
          • return;
            + if (null == channel) { + connect(); + channel = channelRef.get(); }
          • final MessageBatch requests = tryTakeMessages();
          • if (requests==null) { - wait_for_requests = true; - return; - }

            + while (msgs.hasNext()) {
            + TaskMessage message = msgs.next();
            + if (null == messageBatch)

            { + messageBatch = new MessageBatch(messageBatchSize); + }
          • //if channel is being closed and we have no outstanding messages, let's close the channel
          • if (requests.isEmpty() && being_closed.get()) {
          • close_n_release();
          • return;
            + messageBatch.add(message);
            + if (messageBatch.isFull()) {
            + MessageBatch toBeFlushed = messageBatch;
            + flushRequest(channel, toBeFlushed);
              • End diff –

          Yes, that is what storm-329 want to do. But it may require more thoughts, because introducing another blocking buffer have performance penalty, and may cause deadlock.

          I think for your unacked topology, the more reasonable method is to introduce a flow control mechanism. Let's have more disucssion in the thread of STORM-329.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/103#discussion_r13488844 — Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java — @@ -128,133 +185,114 @@ private long getSleepTimeMs() } /** * Enqueue a task message to be sent to server + * Enqueue task messages to be sent to server */ public void send(int task, byte[] message) { //throw exception if the client is being closed if (being_closed.get()) { + synchronized public void send(Iterator<TaskMessage> msgs) { + + // throw exception if the client is being closed + if (closing) { throw new RuntimeException("Client is being closed, and does not take requests any more"); } - try { - message_queue.put(new TaskMessage(task, message)); - - //resume delivery if it is waiting for requests - tryDeliverMessages(true); - } catch (InterruptedException e) { throw new RuntimeException(e); + + if (null == msgs || !msgs.hasNext()) { + return; } } - /** * Retrieve messages from queue, and delivery to server if any */ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException { //just skip if delivery only if waiting, and we are not waiting currently if (only_if_waiting && !wait_for_requests) return; //make sure that channel was not closed Channel channel = channelRef.get(); if (channel == null) return; if (!channel.isOpen()) { LOG.info("Channel to {} is no longer open.",remote_addr); //The channel is not open yet. Reconnect? reconnect(); return; + if (null == channel) { + connect(); + channel = channelRef.get(); } final MessageBatch requests = tryTakeMessages(); if (requests==null) { - wait_for_requests = true; - return; - } + while (msgs.hasNext()) { + TaskMessage message = msgs.next(); + if (null == messageBatch) { + messageBatch = new MessageBatch(messageBatchSize); + } //if channel is being closed and we have no outstanding messages, let's close the channel if (requests.isEmpty() && being_closed.get()) { close_n_release(); return; + messageBatch.add(message); + if (messageBatch.isFull()) { + MessageBatch toBeFlushed = messageBatch; + flushRequest(channel, toBeFlushed); End diff – Yes, that is what storm-329 want to do. But it may require more thoughts, because introducing another blocking buffer have performance penalty, and may cause deadlock. I think for your unacked topology, the more reasonable method is to introduce a flow control mechanism. Let's have more disucssion in the thread of STORM-329 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45360004

          @clockfly and everyone else I love the discussion that is happening here. I think the code is now at a point where we can merge it in (2 binding +1s), and continue the discussion and development on separate JIRAs. I tried to go through all of the comments and most of what is left is around possible improvements that are still remaining, but nothing blocking. If anyone disagrees or has issues they have seen, but not expressed yet please speak up. Otherwise I plan to merge this in later today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45360004 @clockfly and everyone else I love the discussion that is happening here. I think the code is now at a point where we can merge it in (2 binding +1s), and continue the discussion and development on separate JIRAs. I tried to go through all of the comments and most of what is left is around possible improvements that are still remaining, but nothing blocking. If anyone disagrees or has issues they have seen, but not expressed yet please speak up. Otherwise I plan to merge this in later today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ptgoetz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45362633

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45362633 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user miguno commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45389078

          +1

          > On 06.06.2014, at 19:00, "Robert (Bobby) Evans" <notifications@github.com> wrote:
          >
          > @clockfly and everyone else I love the discussion that is happening here. I think the code is now at a point where we can merge it in (2 binding +1s), and continue the discussion and development on separate JIRAs. I tried to go through all of the comments and most of what is left is around possible improvements that are still remaining, but nothing blocking. If anyone disagrees or has issues they have seen, but not expressed yet please speak up. Otherwise I plan to merge this in later today.
          >
          > —
          > Reply to this email directly or view it on GitHub.

          Show
          githubbot ASF GitHub Bot added a comment - Github user miguno commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45389078 +1 > On 06.06.2014, at 19:00, "Robert (Bobby) Evans" <notifications@github.com> wrote: > > @clockfly and everyone else I love the discussion that is happening here. I think the code is now at a point where we can merge it in (2 binding +1s), and continue the discussion and development on separate JIRAs. I tried to go through all of the comments and most of what is left is around possible improvements that are still remaining, but nothing blocking. If anyone disagrees or has issues they have seen, but not expressed yet please speak up. Otherwise I plan to merge this in later today. > > — > Reply to this email directly or view it on GitHub.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45413255

          I tried to merge the pull request into master, and the tests are now failing with the following in the logs (on my mac)
          ```
          134871 [Thread-1250-disruptor-worker-transfer-queue] INFO backtype.storm.util - Halting process: ("Async loop died!")
          ```

          I have not tried it on Linux, but the tests all pass on master. @clockfly could you please look into it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45413255 I tried to merge the pull request into master, and the tests are now failing with the following in the logs (on my mac) ``` 134871 [Thread-1250-disruptor-worker-transfer-queue] INFO backtype.storm.util - Halting process: ("Async loop died!") ``` I have not tried it on Linux, but the tests all pass on master. @clockfly could you please look into it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45413952

          Test pass on my machine,
          Tests run: 157, Assertions: 101461, Failures: 0, Errors: 0

          There should have a stacktrace in the log file, it can searched by using "Async loop died!" without "Halting process"

          (catch Throwable t
          (log-error t "Async loop died!")
          (kill-fn t)
          ))

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45413952 Test pass on my machine, Tests run: 157, Assertions: 101461, Failures: 0, Errors: 0 There should have a stacktrace in the log file, it can searched by using "Async loop died!" without "Halting process" (catch Throwable t (log-error t "Async loop died!") (kill-fn t) ))
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45414150

          OK I'll try to look into it on my side then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45414150 OK I'll try to look into it on my side then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45414524

          @Bobby, I reproduced this. It is random. I will look into this, thanks.

          Seems the Netty Client is closed before worker transfer thread is closed.
          So, transfer thread got an exception.

          137917 [Thread-549-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Async loop died!
          java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
          at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:114) ~[classes/:na]
          at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:85) ~[classes/:na]
          at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:78) ~[classes/:na]
          at backtype.storm.disruptor$consume_loop_STAR_$fn__2895.invoke(disruptor.clj:89) ~[classes/:na]
          at backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) ~[classes/:na]
          at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
          at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22]
          Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
          at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[classes/:na]
          at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[classes/:na]
          at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn_5765$fn_5766.invoke(worker.clj:322) ~[classes/:na]
          at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5765.invoke(worker.clj:320) ~[classes/:na]
          at backtype.storm.disruptor$clojure_handler$reify__2882.onEvent(disruptor.clj:59) ~[classes/:na]
          at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:111) ~[classes/:na]
          ... 6 common frames omitted

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45414524 @Bobby, I reproduced this. It is random. I will look into this, thanks. Seems the Netty Client is closed before worker transfer thread is closed. So, transfer thread got an exception. 137917 [Thread-549-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:114) ~ [classes/:na] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:85) ~ [classes/:na] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:78) ~ [classes/:na] at backtype.storm.disruptor$consume_loop_STAR_$fn__2895.invoke(disruptor.clj:89) ~ [classes/:na] at backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) ~ [classes/:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22] Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.messaging.netty.Client.send(Client.java:194) ~ [classes/:na] at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~ [classes/:na] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn_ 5765$fn _5766.invoke(worker.clj:322) ~ [classes/:na] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5765.invoke(worker.clj:320) ~ [classes/:na] at backtype.storm.disruptor$clojure_handler$reify__2882.onEvent(disruptor.clj:59) ~ [classes/:na] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:111) ~ [classes/:na] ... 6 common frames omitted
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45416875

          fixed.

          The failed case is under namespace: backtype.storm.messaging-test

          root cause:
          The test failure is caused by the bug of local topology simulation. funtion complete-topology will asume topology is finished after all spout return, but it is not always true, the downstream bolts may be still working.

          direct cause:
          complete-topology->all spout finish > shutdown the topology>shutdown worker->shutdown netty client -> bolt still not finished -> bolt try to send mesage to netty client -> client already shutdown -> throw runtime exception.

          the fix is to add a config to allow this topology to wait a longer time to be killed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45416875 fixed. The failed case is under namespace: backtype.storm.messaging-test root cause: The test failure is caused by the bug of local topology simulation. funtion complete-topology will asume topology is finished after all spout return, but it is not always true, the downstream bolts may be still working. direct cause: complete-topology->all spout finish > shutdown the topology >shutdown worker->shutdown netty client -> bolt still not finished -> bolt try to send mesage to netty client -> client already shutdown -> throw runtime exception. the fix is to add a config to allow this topology to wait a longer time to be killed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nathanmarz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45418252

          -1 Tests should never, ever rely on timing in order to pass. This is the whole reason for doing time simulation in the first place, so that when functionality depends on time it can be properly tested without having to worry about random delays messing up the tests.

          complete-topology is inherently reliant on detecting topology completion based on the spout saying all its tuples are "complete". If you're testing topologies that don't do full tuple acking, then you should be testing using the "tracked topologies" utilities in backtype.storm.testing.clj

          For example, here is how the acking system is tested using tracked topologies: https://github.com/apache/incubator-storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L213

          The "tracked-wait" function is the key which will only return when both that many tuples have been emitted by the spouts AND the topology is idle (no tuples have been emitted nor will be emitted without further input) You shouldn't use tracked-topologies for topologies that have tick tuples, but that shouldn't be a problem in this case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nathanmarz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45418252 -1 Tests should never, ever rely on timing in order to pass. This is the whole reason for doing time simulation in the first place, so that when functionality depends on time it can be properly tested without having to worry about random delays messing up the tests. complete-topology is inherently reliant on detecting topology completion based on the spout saying all its tuples are "complete". If you're testing topologies that don't do full tuple acking, then you should be testing using the "tracked topologies" utilities in backtype.storm.testing.clj For example, here is how the acking system is tested using tracked topologies: https://github.com/apache/incubator-storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L213 The "tracked-wait" function is the key which will only return when both that many tuples have been emitted by the spouts AND the topology is idle (no tuples have been emitted nor will be emitted without further input) You shouldn't use tracked-topologies for topologies that have tick tuples, but that shouldn't be a problem in this case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nathanmarz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45419054

          Additionally, besides having the potential to fail randomly inserting sleeps into test code also slows down the tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nathanmarz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45419054 Additionally, besides having the potential to fail randomly inserting sleeps into test code also slows down the tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45419642

          fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45419642 fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nathanmarz commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45447183

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user nathanmarz commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45447183 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user miguno commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45448273

          FYI: I'll add Nathan's comment on our test methodology to DEVELOPER.md.

          > On 07.06.2014, at 20:59, Nathan Marz <notifications@github.com> wrote:
          >
          > -1 Tests should never, ever rely on timing in order to pass. This is the whole reason for doing time simulation in the first place, so that when functionality depends on time it can be properly tested without having to worry about random delays messing up the tests.
          >
          > complete-topology is inherently reliant on detecting topology completion based on the spout saying all its tuples are "complete". If you're testing topologies that don't do full tuple acking, then you should be testing using the "tracked topologies" utilities in backtype.storm.testing.clj
          >
          > For example, here is how the acking system is tested using tracked topologies: https://github.com/apache/incubator-storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L213
          >
          > The "tracked-wait" function is the key which will only return when both that many tuples have been emitted by the spouts AND the topology is idle (no tuples have been emitted nor will be emitted without further input) You shouldn't use tracked-topologies for topologies that have tick tuples, but that shouldn't be a problem in this case.
          >
          > —
          > Reply to this email directly or view it on GitHub.

          Show
          githubbot ASF GitHub Bot added a comment - Github user miguno commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45448273 FYI: I'll add Nathan's comment on our test methodology to DEVELOPER.md. > On 07.06.2014, at 20:59, Nathan Marz <notifications@github.com> wrote: > > -1 Tests should never, ever rely on timing in order to pass. This is the whole reason for doing time simulation in the first place, so that when functionality depends on time it can be properly tested without having to worry about random delays messing up the tests. > > complete-topology is inherently reliant on detecting topology completion based on the spout saying all its tuples are "complete". If you're testing topologies that don't do full tuple acking, then you should be testing using the "tracked topologies" utilities in backtype.storm.testing.clj > > For example, here is how the acking system is tested using tracked topologies: https://github.com/apache/incubator-storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L213 > > The "tracked-wait" function is the key which will only return when both that many tuples have been emitted by the spouts AND the topology is idle (no tuples have been emitted nor will be emitted without further input) You shouldn't use tracked-topologies for topologies that have tick tuples, but that shouldn't be a problem in this case. > > — > Reply to this email directly or view it on GitHub.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45491833

          +1 for me too. The tests passed 3 times in a row, and the changes look good. I'll merge this into trunk now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45491833 +1 for me too. The tests passed 3 times in a row, and the changes look good. I'll merge this into trunk now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/incubator-storm/pull/103

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/incubator-storm/pull/103
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clockfly commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45492048

          Thank you all!

          Show
          githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45492048 Thank you all!
          Hide
          revans2 Robert Joseph Evans added a comment -

          Merged this into master

          Show
          revans2 Robert Joseph Evans added a comment - Merged this into master
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user revans2 commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45494371

          @clockfly thank you. This was a huge amount of work, and should make a big difference. Hopefully you can continue to help improve/support storm going forward.

          Show
          githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45494371 @clockfly thank you. This was a huge amount of work, and should make a big difference. Hopefully you can continue to help improve/support storm going forward.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user miguno commented on the pull request:

          https://github.com/apache/incubator-storm/pull/103#issuecomment-45583294

          Thanks, @clockfly!

          Show
          githubbot ASF GitHub Bot added a comment - Github user miguno commented on the pull request: https://github.com/apache/incubator-storm/pull/103#issuecomment-45583294 Thanks, @clockfly!
          Hide
          jia.fu jia.fu added a comment - - edited

          I meet this issue yet .anyone has a solution on production storm cluster ?

          Show
          jia.fu jia.fu added a comment - - edited I meet this issue yet .anyone has a solution on production storm cluster ?
          Hide
          mauzhang Manu Zhang added a comment -

          jia.fu This jira is resolved. You can get help on the user or dev list with detailed description of your issue.

          Show
          mauzhang Manu Zhang added a comment - jia.fu This jira is resolved. You can get help on the user or dev list with detailed description of your issue.

            People

            • Assignee:
              clockfly Sean Zhong
              Reporter:
              clockfly Sean Zhong
            • Votes:
              3 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development