Details

    • Type: New Feature
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: storm-core
    • Labels:
      None

      Description

      A useful feature missing in Storm topologies is the ability to auto-scale resources, based on a pre-configured metric. The feature proposed here aims to build such a auto-scaling mechanism using a feedback system. A brief overview of the feature is provided here. The finer details of the required components and the scaling algorithm (uses a Feedback System) are provided in the PDFs attached.

      Brief Overview:
      Topologies may get created with or (ideally) without parallelism hints and tasks in their bolts and spouts, before submitting them, If auto-scaling is set in the topology (using a Boolean flag), the topology will also get submitted to the auto-scale module.
      The auto-scale module will read a pre-configured metric (threshold/min) from a configuration file. Using this value, the topology's resources will be modified till the threshold is reached. At each stage in the auto-scale module's execution, feedback from the previous execution will be used to tune the resources.

      The systems that need to be in place to achieve this are:
      1. Metrics which provide the current threshold (no: of acks per minute) for a topology's spouts and bolts.
      2. Access to Storm's CLI tool which can change a topology's resources are runtime.
      3. A new java or clojure module which runs within the Nimbus daemon or in parallel to it. This will be the auto-scale module.

      Limitations: (This is not an exhaustive list. More will be added as the design matures. Also, some of the points here may get resolved)
      To test the feature there will be a number of limitations in the first release. As the feature matures, it will be allowed to scale more
      1. The auto-scale module will be limited to a few topologies (maybe 4 or 5 at maximum)
      2. New bolts will not be added to scale a topology. This feature will be limited to increasing the resources within the existing topology.
      3. Topology resources will not be decreased when it is running at more than the required number (except for a few cases)
      4. This feature will work only for long-running topologies where the input threshold can become equal to or greater than the required threshold

      1. Algorithm for Auto-Scaling.pdf
        84 kB
        HARSHA BALASUBRAMANIAN
      2. Project Plan and Scope.pdf
        423 kB
        HARSHA BALASUBRAMANIAN

        Activity

        Hide
        harshab85@gmail.com HARSHA BALASUBRAMANIAN added a comment -

        Files which describe this project

        Show
        harshab85@gmail.com HARSHA BALASUBRAMANIAN added a comment - Files which describe this project
        Hide
        sriharsha Sriharsha Chintalapani added a comment -

        HARSHA BALASUBRAMANIAN Any update on this JIRA. If you made progress on this jira can you please share your findings.

        Show
        sriharsha Sriharsha Chintalapani added a comment - HARSHA BALASUBRAMANIAN Any update on this JIRA. If you made progress on this jira can you please share your findings.
        Hide
        shailesh_pilare SHAILESH PILARE added a comment -

        I would like to work on this JIRA .

        Show
        shailesh_pilare SHAILESH PILARE added a comment - I would like to work on this JIRA .
        Hide
        rohits47 Rohit Sanbhadti added a comment -

        Hi,

        We're a group of three CS students at UIUC and we have been working on this as a research project. We already have some initial implementation and are in the process of trying out other algorithms besides those suggested in the attached PDFs.

        Show
        rohits47 Rohit Sanbhadti added a comment - Hi, We're a group of three CS students at UIUC and we have been working on this as a research project. We already have some initial implementation and are in the process of trying out other algorithms besides those suggested in the attached PDFs.
        Hide
        sanchitgupta05 Sanchit Gupta added a comment - - edited

        Hi,
        We worked on the project and have created an initial paper.
        Link: bit.ly/Dynamic_Storm

        Please let us know about any critique, questions, or comments.

        Thanks
        Sanchit Gupta

        Show
        sanchitgupta05 Sanchit Gupta added a comment - - edited Hi, We worked on the project and have created an initial paper. Link: bit.ly/Dynamic_Storm Please let us know about any critique, questions, or comments. Thanks Sanchit Gupta
        Hide
        kabhwan Jungtaek Lim added a comment -

        Could you share your paper to public place?
        I logged in dropbox but can't find your paper. And it seems to be not good to log in dropbox to see.

        Show
        kabhwan Jungtaek Lim added a comment - Could you share your paper to public place? I logged in dropbox but can't find your paper. And it seems to be not good to log in dropbox to see.
        Hide
        sanchitgupta05 Sanchit Gupta added a comment -

        Try this link:

        bit.ly/Dynamic_Storm

        • Sanchit
        Show
        sanchitgupta05 Sanchit Gupta added a comment - Try this link: bit.ly/Dynamic_Storm Sanchit
        Hide
        memorylake@qq.com Memory Lake added a comment -

        I just glanced over your manuscript and came up with a simple question. In this paper you calculated the receive latency with the length of incoming tuple queue, but as far as I know, Storm itself hasn't provided such kind of fine-grained metric and so is the length of outgoing tuple queue. So have you modified the core code of Storm?

        Show
        memorylake@qq.com Memory Lake added a comment - I just glanced over your manuscript and came up with a simple question. In this paper you calculated the receive latency with the length of incoming tuple queue, but as far as I know, Storm itself hasn't provided such kind of fine-grained metric and so is the length of outgoing tuple queue. So have you modified the core code of Storm?
        Hide
        jon.weygandt Jon Weygandt added a comment -

        I have some interest in solving this problem. We have a Hack Week coming up, and there are several folks interested. Having read the proposals, as well as the Dynamic Storm paper, I believe my current ideas line up a bit closer to the original proposal.

        I will attempt to tackle it from what I'm going to call a "Balanced Storm Topology" (don't know if there is another name): This is where the parallelism will be auto-scaled to a multiple of the number of auto-scaled workers, and each worker has the same number and type of bolts and spouts. I have observed that the FAQs indicate this is a good thing, and Nimbus seems to gravitate to this configuration if the parallelism is right. I believe this will be the most stable layout of the topology, short of one instance per worker, and one worker per host/container (sounds like a description for Heron).

        My plan is to use latency*events/sec normalized to 0-100% as the primary metric for determine these values.

        One of the goals will be to limit the number of iterations as rebalancing a Storm topology is not without its costs. Another feature of our use cases is that there are daily and weekly fluctuations in our load, but the max is somewhat stable (it grows slowly over time). And since we need to be guaranteed to have enough hardware to handle our peak loads, the sizing parameters and scaling algorithms will reflect this scenario. Generally we know the max input rate (or predicted input rate) at the spouts, the system will be sized to that at a minimum, and there won't be any scale-down below this limit.

        Because of this I'd like to consider is the "replay on startup" issue, where if Storm is down for a bit, and you have messages queued up, that on startup, you system will typically burst to 100% till the backlog is drained. I wish to put some controls in place to handle this, as I don't think the system should try to scale up briefly, and then scale down. This will be a bit more difficult, as I believe that I need to know from each spout if it is playing a backlog, or current time, as well as the backlog size, and the rate at which it is changing. Sounds like perhaps a new interface that auto-scaling spouts might support?

        Then as inputs:
        *) user can elect certain headroom on the latency*events number (or we may have good defaults)
        *) planned number of events for each spout, as the current load may be less, or may have daily variations, or you may want to plan ahead for holiday time or some such.
        *) estimated maximum down time and desired backlog catch-up time.

        And given time, various events will need to come out of Nimbus to external systems to add more hardware, as well as various alarms/notifications.

        Show
        jon.weygandt Jon Weygandt added a comment - I have some interest in solving this problem. We have a Hack Week coming up, and there are several folks interested. Having read the proposals, as well as the Dynamic Storm paper, I believe my current ideas line up a bit closer to the original proposal. I will attempt to tackle it from what I'm going to call a "Balanced Storm Topology" (don't know if there is another name): This is where the parallelism will be auto-scaled to a multiple of the number of auto-scaled workers, and each worker has the same number and type of bolts and spouts. I have observed that the FAQs indicate this is a good thing, and Nimbus seems to gravitate to this configuration if the parallelism is right. I believe this will be the most stable layout of the topology, short of one instance per worker, and one worker per host/container (sounds like a description for Heron). My plan is to use latency*events/sec normalized to 0-100% as the primary metric for determine these values. One of the goals will be to limit the number of iterations as rebalancing a Storm topology is not without its costs. Another feature of our use cases is that there are daily and weekly fluctuations in our load, but the max is somewhat stable (it grows slowly over time). And since we need to be guaranteed to have enough hardware to handle our peak loads, the sizing parameters and scaling algorithms will reflect this scenario. Generally we know the max input rate (or predicted input rate) at the spouts, the system will be sized to that at a minimum, and there won't be any scale-down below this limit. Because of this I'd like to consider is the "replay on startup" issue, where if Storm is down for a bit, and you have messages queued up, that on startup, you system will typically burst to 100% till the backlog is drained. I wish to put some controls in place to handle this, as I don't think the system should try to scale up briefly, and then scale down. This will be a bit more difficult, as I believe that I need to know from each spout if it is playing a backlog, or current time, as well as the backlog size, and the rate at which it is changing. Sounds like perhaps a new interface that auto-scaling spouts might support? Then as inputs: *) user can elect certain headroom on the latency*events number (or we may have good defaults) *) planned number of events for each spout, as the current load may be less, or may have daily variations, or you may want to plan ahead for holiday time or some such. *) estimated maximum down time and desired backlog catch-up time. And given time, various events will need to come out of Nimbus to external systems to add more hardware, as well as various alarms/notifications.
        Hide
        memorylake@qq.com Memory Lake added a comment -

        Hi Jon, what do you mean by using latency*events/sec? It seems to be the same thing as capacity in my understanding. You also mentioned that each worker has the same number and type of bolts and spouts, are you suggesting that every bolt should have a parallel degree that is larger than or even a multiple of the number of workers?

        Show
        memorylake@qq.com Memory Lake added a comment - Hi Jon, what do you mean by using latency*events/sec? It seems to be the same thing as capacity in my understanding. You also mentioned that each worker has the same number and type of bolts and spouts, are you suggesting that every bolt should have a parallel degree that is larger than or even a multiple of the number of workers?
        Hide
        jon.weygandt Jon Weygandt added a comment -

        latency*events/sec = capacity - We are probably saying the same thing. Which is different than UIUC's proposal using queue lengths. I believe this metric behaves much more linearly with respect to load, when the system is in the normal rage of operations. Of course capacity may be one of many, but that will be my starting point.

        RE Parallelism: yes, for this model: "a bolts parallelism" = n * workers; where n is an integer.
        Everyone I have worked with seem to agree that this is simply more than a "good rule of thumb", but necessary to have a predictable system.

        I believe my scaling system will work for many, but not for all. Storm has many uses. There could be a place for UIUC's proposal, as it did not have this constraint. Keeping comments on one topic and small, see next comment on some ideas I have.

        Show
        jon.weygandt Jon Weygandt added a comment - latency*events/sec = capacity - We are probably saying the same thing. Which is different than UIUC's proposal using queue lengths. I believe this metric behaves much more linearly with respect to load, when the system is in the normal rage of operations. Of course capacity may be one of many, but that will be my starting point. RE Parallelism: yes, for this model: "a bolts parallelism" = n * workers; where n is an integer. Everyone I have worked with seem to agree that this is simply more than a "good rule of thumb", but necessary to have a predictable system. I believe my scaling system will work for many, but not for all. Storm has many uses. There could be a place for UIUC's proposal, as it did not have this constraint. Keeping comments on one topic and small, see next comment on some ideas I have.
        Hide
        jon.weygandt Jon Weygandt added a comment -

        I believe it will be a long time before someone creates a single auto-scale solution that will suit all Storm users. My solution will have a "parallelism constraint", which does not seem bad, as is seems many users already behave this way. But I realize there will be others who don't like this constraint. To that end, I will attempt to create a layer whereby one can plug in different AutoScaleManagers (I'm going to propose a name like "AutoScaleManager", as Threshold Monitor Daemon seems to imply implementations). If we do it right, my implementation can coexist with UIUC's implementation, and others. I wish them well in their solution, my motivation is that I'm going to be responsible for 100's of independent Topologies, and without something like this it will be an operational nightmare.

        I envision the AutoScaleManager will be associated with the Topology when it is submitted to Nimbus. Could be part of the core Storm code base, or a custom user class.

        There will be one (or more) strategically placed calls from the Topology management code during the management lifecycle, such as during deployment/delivery of code to Workers, to modify/enhance the topology. It will be in this phase that saved history or initial defaults will be applied to the Topology, altering parallelism, workers, ackers, queue sizes etc... This call will be a "black box" to core Storm code. The "saved history" will be the sole responsibility of the AutoScaleManager.

        Then the AutoScaleManager must run as a daemon somewhere. I'm going to propose that it run as a Thread inside Nimbus. This will make management of the daemon easy for simple use cases. And when Nimbus solves the HA/failover issues, the AutoScaleManager take advantage of that solution as well. Whenever Nimbus is watching over a Topology, the AutoScaleManager for that Topology is running. We will need various lifecycle methods such as start and stop that Nimbus will call at the correct times.

        When the AutoScaleManager needs to make changes, all of the Nimbus APIs are available. It could do whatever runtime changes are allowed, like alter parallelism. For large changes it will simply stop then start the Topology, the "enhance" callback will perform the changes. Furthermore, if it is making runtime changes, it will need to remember them so when code changes happen, they are used as the initial defaults for that code roll.

        Show
        jon.weygandt Jon Weygandt added a comment - I believe it will be a long time before someone creates a single auto-scale solution that will suit all Storm users. My solution will have a "parallelism constraint", which does not seem bad, as is seems many users already behave this way. But I realize there will be others who don't like this constraint. To that end, I will attempt to create a layer whereby one can plug in different AutoScaleManagers (I'm going to propose a name like "AutoScaleManager", as Threshold Monitor Daemon seems to imply implementations). If we do it right, my implementation can coexist with UIUC's implementation, and others. I wish them well in their solution, my motivation is that I'm going to be responsible for 100's of independent Topologies, and without something like this it will be an operational nightmare. I envision the AutoScaleManager will be associated with the Topology when it is submitted to Nimbus. Could be part of the core Storm code base, or a custom user class. There will be one (or more) strategically placed calls from the Topology management code during the management lifecycle, such as during deployment/delivery of code to Workers, to modify/enhance the topology. It will be in this phase that saved history or initial defaults will be applied to the Topology, altering parallelism, workers, ackers, queue sizes etc... This call will be a "black box" to core Storm code. The "saved history" will be the sole responsibility of the AutoScaleManager. Then the AutoScaleManager must run as a daemon somewhere. I'm going to propose that it run as a Thread inside Nimbus. This will make management of the daemon easy for simple use cases. And when Nimbus solves the HA/failover issues, the AutoScaleManager take advantage of that solution as well. Whenever Nimbus is watching over a Topology, the AutoScaleManager for that Topology is running. We will need various lifecycle methods such as start and stop that Nimbus will call at the correct times. When the AutoScaleManager needs to make changes, all of the Nimbus APIs are available. It could do whatever runtime changes are allowed, like alter parallelism. For large changes it will simply stop then start the Topology, the "enhance" callback will perform the changes. Furthermore, if it is making runtime changes, it will need to remember them so when code changes happen, they are used as the initial defaults for that code roll.
        Hide
        memorylake@qq.com Memory Lake added a comment -

        Hi Jon, From my humble understanding, putting parallelism like that is not very tenable, as some components are just responsible for writing some negligible results and thus shouldn't be spread over the cluster. But apart from that, I think the most tricky part is how would you identify the most bottlenecked component, and how to tackle this imbalance problem. The bottleneck could be either with spout whose instances are not powerful enough (so that some of the bolts are actually starving) or some bolts lying in the critical path that do not have enough parallelism hints. Since the bottleneck is absolutely application dependent, I believe a "good rule of thumb" is still needed in an universal auto-scaling system and maybe you need to address that clearly in your approach.

        Show
        memorylake@qq.com Memory Lake added a comment - Hi Jon, From my humble understanding, putting parallelism like that is not very tenable, as some components are just responsible for writing some negligible results and thus shouldn't be spread over the cluster. But apart from that, I think the most tricky part is how would you identify the most bottlenecked component, and how to tackle this imbalance problem. The bottleneck could be either with spout whose instances are not powerful enough (so that some of the bolts are actually starving) or some bolts lying in the critical path that do not have enough parallelism hints. Since the bottleneck is absolutely application dependent, I believe a "good rule of thumb" is still needed in an universal auto-scaling system and maybe you need to address that clearly in your approach.
        Hide
        jon.weygandt Jon Weygandt added a comment -

        I have yet to work with a use case like that, although one of my colleagues pointed this out. I am really interested in how Nimbus actually converts bolts/parallelism/workers into a physical executable layout. This has always fascinated me, as how does one take the given input and create something that is stable and manageable. I have yet to look at the code for this, but if it is very predictable and what one wants: the "singleton bolts" will need to be tagged in a method the AutoScaleManager recognizes and the scaling algorithm will use that tagging. If the topology allocation is not what one wants, but it is somewhat "modular", perhaps the AutoScaleManager could have a computePhysicalTopology method, so that part of the layout is under the control of the auto-scale system.

        Although we did go into more detail on what bolts classify as "singleton bolts". One that does: an accumulator of some distributed operation, that is accumulating partial results into a final result. Thus if you partitioned it you would not get the results you want. These must be "singleton bolts". One that does not: an email out bolt. Yes it is trivial, the system will work fine with only one. But logically the system will work fine with one per worker as well. Some might think that's wasteful, but for email, not really. You have one more thread, but it is doing X/workers amount of work. In fact this may be more efficient, as you won't have to do (de)serialization and IPC to the singleton. Now if it is a database output bolt, and connections to the DB are limited, a tradeoff.

        We will support both scenarios, so the Storm developer will have a choice.

        Show
        jon.weygandt Jon Weygandt added a comment - I have yet to work with a use case like that, although one of my colleagues pointed this out. I am really interested in how Nimbus actually converts bolts/parallelism/workers into a physical executable layout. This has always fascinated me, as how does one take the given input and create something that is stable and manageable. I have yet to look at the code for this, but if it is very predictable and what one wants: the "singleton bolts" will need to be tagged in a method the AutoScaleManager recognizes and the scaling algorithm will use that tagging. If the topology allocation is not what one wants, but it is somewhat "modular", perhaps the AutoScaleManager could have a computePhysicalTopology method, so that part of the layout is under the control of the auto-scale system. Although we did go into more detail on what bolts classify as "singleton bolts". One that does: an accumulator of some distributed operation, that is accumulating partial results into a final result. Thus if you partitioned it you would not get the results you want. These must be "singleton bolts". One that does not: an email out bolt. Yes it is trivial, the system will work fine with only one. But logically the system will work fine with one per worker as well. Some might think that's wasteful, but for email, not really. You have one more thread, but it is doing X/workers amount of work. In fact this may be more efficient, as you won't have to do (de)serialization and IPC to the singleton. Now if it is a database output bolt, and connections to the DB are limited, a tradeoff. We will support both scenarios, so the Storm developer will have a choice.
        Hide
        jon.weygandt Jon Weygandt added a comment -

        How to detect bottlenecks: This is tricky to predict and we won't in a single iteration, and UIUC did about 16 (+/-) iterations in their sample runs. Remember this may not work for everything, which is why this is designed to be a "plugin", so others can improve on this or do things differently.

        One input will be the target tps for each spout. We have daily/weekly/seasonal variations, and the owner of the topology should have some idea of the target rates. That will mean that during the busy time (also most important time) the system is not trying to rebalance, as rebalances may produce s small glitch in the flow. FWIW - I'm not trying to solve a totally elastic min by min or hr by hr flex problem. The problem I'm working on is to ease the management of a topology over the entire lifetime: the initial deployment and adjustment of numbers; as well as longer term, code roll to code roll, gradual increase in load; the changes for the estimated increase for seasonal traffic; as well as some automated handling and/or alerting when there are unexpected changes.

        Algorithm proposal:

        Step 1: So if the spout is pegged at 100% capacity, and we know the current tps, and target tps, we can estimate the number of instances. If the spout is not, likely one of the downstream bolts is. We do the same estimate.

        Step 2: Then the next part is tricker, but follow the topology and you can get a ratio of the weighted average input of the spouts tps with respect to each bolts tps, and each bolts capacity. And knowing the target spouts capacity, do the math and get an estimate. As the bottleneck moves from spout to the last bolt, this estimate will get better and better for the bolts further up stream.

        Add in some hysteresis to prevent needless cycling.

        I believe this will converge for many system in a few cycles. And it may quite likely be bounded by the length/depth of the topology.

        Show
        jon.weygandt Jon Weygandt added a comment - How to detect bottlenecks: This is tricky to predict and we won't in a single iteration, and UIUC did about 16 (+/-) iterations in their sample runs. Remember this may not work for everything, which is why this is designed to be a "plugin", so others can improve on this or do things differently. One input will be the target tps for each spout. We have daily/weekly/seasonal variations, and the owner of the topology should have some idea of the target rates. That will mean that during the busy time (also most important time) the system is not trying to rebalance, as rebalances may produce s small glitch in the flow. FWIW - I'm not trying to solve a totally elastic min by min or hr by hr flex problem. The problem I'm working on is to ease the management of a topology over the entire lifetime: the initial deployment and adjustment of numbers; as well as longer term, code roll to code roll, gradual increase in load; the changes for the estimated increase for seasonal traffic; as well as some automated handling and/or alerting when there are unexpected changes. Algorithm proposal: Step 1: So if the spout is pegged at 100% capacity, and we know the current tps, and target tps, we can estimate the number of instances. If the spout is not, likely one of the downstream bolts is. We do the same estimate. Step 2: Then the next part is tricker, but follow the topology and you can get a ratio of the weighted average input of the spouts tps with respect to each bolts tps, and each bolts capacity. And knowing the target spouts capacity, do the math and get an estimate. As the bottleneck moves from spout to the last bolt, this estimate will get better and better for the bolts further up stream. Add in some hysteresis to prevent needless cycling. I believe this will converge for many system in a few cycles. And it may quite likely be bounded by the length/depth of the topology.
        Hide
        memorylake@qq.com Memory Lake added a comment - - edited

        Thank you for your input. I have carefully read the paper from UIUC, one thing still concerns me is that the length of input queue which they claimed is a built-in metric in Storm is not that easy to retrieve. According to my observation there is no such metric in the UI daemon or any other part of Storm’s implementation. Maybe I didn’t manage to understand it correctly, but missing this critical information makes their bottleneck detection hard to reproduce in my system and thus I have to figure it out on my own, which by the way, is kind of similar to what you have proposed above and is mainly based on the capacity obtained from the monitor. Besides, in my case even the same size of input stream could potentially induce a varying resource demand, since some logic in bolts are basically depends on the type and fields of tuples. Have you guys taken this into your considerations?

        Show
        memorylake@qq.com Memory Lake added a comment - - edited Thank you for your input. I have carefully read the paper from UIUC, one thing still concerns me is that the length of input queue which they claimed is a built-in metric in Storm is not that easy to retrieve. According to my observation there is no such metric in the UI daemon or any other part of Storm’s implementation. Maybe I didn’t manage to understand it correctly, but missing this critical information makes their bottleneck detection hard to reproduce in my system and thus I have to figure it out on my own, which by the way, is kind of similar to what you have proposed above and is mainly based on the capacity obtained from the monitor. Besides, in my case even the same size of input stream could potentially induce a varying resource demand, since some logic in bolts are basically depends on the type and fields of tuples. Have you guys taken this into your considerations?
        Hide
        memorylake@qq.com Memory Lake added a comment -

        It will be very helpful if we could find a way to measure the waiting time that a tuple spent in a component or at lest the length of input queue for each task so that the queuing theory could come to our rescue. As we have already known the complete latency for spout and the process latency for each bolt, I believe the missing part is how the rest of latency is distributed. I am trying to find a way plugging in to storm and get this vital information.

        Show
        memorylake@qq.com Memory Lake added a comment - It will be very helpful if we could find a way to measure the waiting time that a tuple spent in a component or at lest the length of input queue for each task so that the queuing theory could come to our rescue. As we have already known the complete latency for spout and the process latency for each bolt, I believe the missing part is how the rest of latency is distributed. I am trying to find a way plugging in to storm and get this vital information.
        Hide
        jon.weygandt Jon Weygandt added a comment -

        As I wrote this, I realize that my idea of auto-scaling could be different than others. It is to ensure that I have enough capacity on "average" to handle the load. Others may like to optimize on overall system latency, which would have a different algorithm. This auto-scale change will be a plugin, so one can try to optimize for this case with another implementation should ultra-low latency be a concern.

        On latency, capacity and input queue depth of a bolt with respect to auto-scaling, which is different than total system measures.

        Latency is the time it take the bolt to process a Tuple. Short of a code change, or the dependent output system change (e.g. Kafka output bolt), the latency will remain constant as load changes. Given latency maximum throughput of the bolt is known, and it generally remains constant. For CPU bound bolts, provided you don't overload the system, the system will scale linearly. Similarly for a Kafka bolt, provided you don't overload Kafka. Even when the bolt is running at 100% you can figure out the throughput. It may not be perfect, but it is fairly good for the systems I've worked with.

        Queue depth is very different and really interesting, but it is a function of many things. It is also erratic and difficult to properly measure. Generally, if the system is not overloaded, the instantaneous queue depth will be zero quite often. If the inbound load was perfectly constant, in theory, the queue depth would either be 1 or 0, never greater than 1. When the inbound load is erratic (e.g. it bursts), the queue fills up, but a bursty inbound load also has the opposite side, a low rate, and this allows the system time to drain the queue. What happens when the queue fills up? Hopefully no one builds an unbounded queue! OOM! it will either need to overflow and drop data, or apply backpressure upstream. In the last system I worked on the messaging system did not allow backpressure, and when and individual partition was running around 400K/sec we could have queue depths of 100K. But they would always return to zero, as the downstream components were able to handle the average load. As long as the queue depth fluctuates between zero and some number you have a system that is not overloaded. Even with backpressure we would still see this same pattern. This is why I find this metric difficult to work with for auto-scaling. Further, to measure queue depth, sampling the depth periodically is difficult. You would need a large number of samples to state that the queue was “at zero 2% of the time”. In my prior system, max queue depth was extremely important for me, and I instrumented the input side of the queue to record the maximum depth with each put, and reset it each sampling of the metric.

        Now some will be concerned about overall system latency, which is generally not my concern. It would only be of concern if someone said "99% of the data is delivered in 100ms or less". If your latency is measured in seconds, I would say throughput is the primary goal. If you don't overload, latency is generally driven by the speed of your algorithms, and bursting. If you think you have sub-sub second latencies as a requirement, then bursting, GC, backpressure and acknowledgment mechanisms will be of concern as well.

        Show
        jon.weygandt Jon Weygandt added a comment - As I wrote this, I realize that my idea of auto-scaling could be different than others. It is to ensure that I have enough capacity on "average" to handle the load. Others may like to optimize on overall system latency, which would have a different algorithm. This auto-scale change will be a plugin, so one can try to optimize for this case with another implementation should ultra-low latency be a concern. On latency, capacity and input queue depth of a bolt with respect to auto-scaling, which is different than total system measures. Latency is the time it take the bolt to process a Tuple. Short of a code change, or the dependent output system change (e.g. Kafka output bolt), the latency will remain constant as load changes. Given latency maximum throughput of the bolt is known, and it generally remains constant. For CPU bound bolts, provided you don't overload the system, the system will scale linearly. Similarly for a Kafka bolt, provided you don't overload Kafka. Even when the bolt is running at 100% you can figure out the throughput. It may not be perfect, but it is fairly good for the systems I've worked with. Queue depth is very different and really interesting, but it is a function of many things. It is also erratic and difficult to properly measure. Generally, if the system is not overloaded, the instantaneous queue depth will be zero quite often. If the inbound load was perfectly constant, in theory, the queue depth would either be 1 or 0, never greater than 1. When the inbound load is erratic (e.g. it bursts), the queue fills up, but a bursty inbound load also has the opposite side, a low rate, and this allows the system time to drain the queue. What happens when the queue fills up? Hopefully no one builds an unbounded queue! OOM! it will either need to overflow and drop data, or apply backpressure upstream. In the last system I worked on the messaging system did not allow backpressure, and when and individual partition was running around 400K/sec we could have queue depths of 100K. But they would always return to zero, as the downstream components were able to handle the average load. As long as the queue depth fluctuates between zero and some number you have a system that is not overloaded. Even with backpressure we would still see this same pattern. This is why I find this metric difficult to work with for auto-scaling. Further, to measure queue depth, sampling the depth periodically is difficult. You would need a large number of samples to state that the queue was “at zero 2% of the time”. In my prior system, max queue depth was extremely important for me, and I instrumented the input side of the queue to record the maximum depth with each put, and reset it each sampling of the metric. Now some will be concerned about overall system latency, which is generally not my concern. It would only be of concern if someone said "99% of the data is delivered in 100ms or less". If your latency is measured in seconds, I would say throughput is the primary goal. If you don't overload, latency is generally driven by the speed of your algorithms, and bursting. If you think you have sub-sub second latencies as a requirement, then bursting, GC, backpressure and acknowledgment mechanisms will be of concern as well.
        Hide
        ksenji Kishore Senji added a comment -

        I could not find the input queue length either. The closest is the executor receive ring buffer queue size. Since we cannot control this directly, do we need this information for auto-scaling?

        Can we base the auto-scaling solely on capacity? If the capacity of a bolt is equal to 1, then that means the bolt is already processing maximum number of messages based on the bolt latency (potential for queue up of incoming messages and we can gauge based on the tps as well) and so we can start to increase parallelism for that bolt. We can start the algo with a small max spout pending (as outlined in manual tuning https://storm.apache.org/documentation/FAQ.html) and gradually increase it, tuning the parallelism simultaneously until we reach the desired tps (transactions/sec) for the spout.

        Show
        ksenji Kishore Senji added a comment - I could not find the input queue length either. The closest is the executor receive ring buffer queue size. Since we cannot control this directly, do we need this information for auto-scaling? Can we base the auto-scaling solely on capacity? If the capacity of a bolt is equal to 1, then that means the bolt is already processing maximum number of messages based on the bolt latency (potential for queue up of incoming messages and we can gauge based on the tps as well) and so we can start to increase parallelism for that bolt. We can start the algo with a small max spout pending (as outlined in manual tuning https://storm.apache.org/documentation/FAQ.html ) and gradually increase it, tuning the parallelism simultaneously until we reach the desired tps (transactions/sec) for the spout.
        Hide
        memorylake@qq.com Memory Lake added a comment -

        In the project I am working on the process latency for a component is not fixed, actually it could vary a lot when changing the parallelism settings. As a general rule I have observed from the metric information, the component that has insufficient parallelization and likely to be the bottleneck tends to have a higher process latency, which could be as large as 10X of the normal situation. I think it could be explained as other components in the same machine have occupied too many resources and the CPU is likely to be scheduled to another thread even though the bottlenecked bolt is currently processing a tuple.

        Show
        memorylake@qq.com Memory Lake added a comment - In the project I am working on the process latency for a component is not fixed, actually it could vary a lot when changing the parallelism settings. As a general rule I have observed from the metric information, the component that has insufficient parallelization and likely to be the bottleneck tends to have a higher process latency, which could be as large as 10X of the normal situation. I think it could be explained as other components in the same machine have occupied too many resources and the CPU is likely to be scheduled to another thread even though the bottlenecked bolt is currently processing a tuple.
        Hide
        jerrypeng Boyang Jerry Peng added a comment -

        Hello guys,

        Recently there has been paper's published in academia that use existing proofs in the area of queueing theory to calculate the parallelism of each component. Autoscaling or elasticity in storm can be viewed as a queuing theory problem since in storm tuples need to be processed and there are only a limited number of processor's to process them.

        Show
        jerrypeng Boyang Jerry Peng added a comment - Hello guys, Recently there has been paper's published in academia that use existing proofs in the area of queueing theory to calculate the parallelism of each component. Autoscaling or elasticity in storm can be viewed as a queuing theory problem since in storm tuples need to be processed and there are only a limited number of processor's to process them.
        Hide
        jon.weygandt Jon Weygandt added a comment -

        Do you have a link to this paper?

        Show
        jon.weygandt Jon Weygandt added a comment - Do you have a link to this paper?

          People

          • Assignee:
            pjamshidi Pooyan Jamshidi
            Reporter:
            harshab85@gmail.com HARSHA BALASUBRAMANIAN
          • Votes:
            7 Vote for this issue
            Watchers:
            32 Start watching this issue

            Dates

            • Created:
              Updated:

              Time Tracking

              Estimated:
              Original Estimate - 504h
              504h
              Remaining:
              Remaining Estimate - 504h
              504h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development