Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2693

Topology submission or kill takes too much time when topologies grow to a few hundred

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.9.6, 1.0.2, 1.1.0, 1.0.3
    • Fix Version/s: None
    • Component/s: storm-core

      Description

      Now for a storm cluster with 40 hosts [with 32 cores/128G memory] and hundreds of topologies, nimbus submission and killing will take about minutes to finish. For example, for a cluster with 300 hundred of topologies´╝îit will take about 8 minutes to submit a topology, this affect our efficiency seriously.

      So, i check out the nimbus code and find two factor that will effect nimbus submission/killing time for a scheduling round:

      • read existing-assignments from zookeeper for every topology [will take about 4 seconds for a 300 topologies cluster]
      • read all the workers heartbeats and update the state to nimbus cache [will take about 30 seconds for a 300 topologies cluster]
        the key here is that Storm now use zookeeper to collect heartbeats [not RPC], and also keep physical plan [assignments] using zookeeper which can be totally local in nimbus.

      So, i think we should make some changes to storm's heartbeats and assignments management.

      For assignment promotion:
      1. nimbus will put the assignments in local disk
      2. when restart or HA leader trigger nimbus will recover assignments from zk to local disk
      3. nimbus will tell supervisor its assignment every time through RPC every scheduling round
      4. supervisor will sync assignments at fixed time

      For heartbeats promotion:
      1. workers will report executors ok or wrong to supervisor at fixed time
      2. supervisor will report workers heartbeats to nimbus at fixed time
      3. if supervisor die, it will tell nimbus through runtime hook
      or let nimbus find it through aware supervisor if is survive
      4. let supervisor decide if worker is running ok or invalid , supervisor will tell nimbus which executors of every topology are ok

        Issue Links

          Activity

          Hide
          kabhwan Jungtaek Lim added a comment -

          Yuzhao Chen
          Just curious, could you try out pacemaker to see it helps on your case?

          Show
          kabhwan Jungtaek Lim added a comment - Yuzhao Chen Just curious, could you try out pacemaker to see it helps on your case?
          Hide
          danny0405 Yuzhao Chen added a comment -

          We have tried pacemaker, it only promote heartbeats fetching time, and when it died the cluster will just collapse, even it has a HA, many topologies will be affected. Actually, i think the heartbeats should not contains stats [10m 3h 1d window] info of every executor, we should collect the stats individually.

          Show
          danny0405 Yuzhao Chen added a comment - We have tried pacemaker, it only promote heartbeats fetching time, and when it died the cluster will just collapse, even it has a HA, many topologies will be affected. Actually, i think the heartbeats should not contains stats [10m 3h 1d window] info of every executor, we should collect the stats individually.
          Hide
          kabhwan Jungtaek Lim added a comment -

          Yuzhao Chen
          Yes totally agreed. I think some of us are trying to make huge change to metrics (named as Metrics V2) but looks like it requires more time to come into realize.

          Show
          kabhwan Jungtaek Lim added a comment - Yuzhao Chen Yes totally agreed. I think some of us are trying to make huge change to metrics (named as Metrics V2) but looks like it requires more time to come into realize.
          Hide
          kabhwan Jungtaek Lim added a comment - - edited

          Yuzhao Chen
          Just to put some guides for contribution (sorry we didn't document it nicely):

          • all the patches are ideal to be against with master (currently 2.0.0)
          • merger or any committers can decide which version line(s) the patch should be put in, and port back if necessary
          • there's exceptional case: if patch addresses the bug which only resides on that version line(s)

          Let's back to your design.

          I haven't look into the detail, but I'm wondering how reading/storing from/to local disk will work with Nimbus H/A. Storm puts any critical data into zookeeper to ensure the data can be available at any time (when ZK is not available Storm will not work anyway...) This would be no longer true if we store it to only local disk.

          You would want to think about the cases which each Nimbus has its own assignments (which is not in sync) stored to local (only one of them is leader though) and all of them are shutting down and restarting at the same time. Which Nimbus should be a leader? How we ensure the elected Nimbus has the latest assignments?

          Btw, the idea brings me some thoughts: one of Storm's great point is stateless which can be achieved with stable storage (ZK for Storm), and we claims that Supervisor can work even Nimbus goes down. Now we introduced Nimbus H/A which makes Nimbus not a SPOF, and so some components like Supervisor may be able to rely on leader Nimbus instead of communicating with ZK if it is much faster and it doesn't put too much load to the leader Nimbus. (And this approach looks like same as your approach on your idea.)

          And since we only allow leader Nimbus to handle assignments, while we still need to write assignments to ZK, we can 'cache' it within leader Nimbus (and update cache if there's any change on assignments: note that we should also write to ZK earlier though) and avoid reading it from ZK.
          We still need to get heartbeat (and statistics) from ZK, but it will be handled as similar idea or another approach (like Metrics V2 in point of metrics' view)

          Show
          kabhwan Jungtaek Lim added a comment - - edited Yuzhao Chen Just to put some guides for contribution (sorry we didn't document it nicely): all the patches are ideal to be against with master (currently 2.0.0) merger or any committers can decide which version line(s) the patch should be put in, and port back if necessary there's exceptional case: if patch addresses the bug which only resides on that version line(s) Let's back to your design. I haven't look into the detail, but I'm wondering how reading/storing from/to local disk will work with Nimbus H/A. Storm puts any critical data into zookeeper to ensure the data can be available at any time (when ZK is not available Storm will not work anyway...) This would be no longer true if we store it to only local disk. You would want to think about the cases which each Nimbus has its own assignments (which is not in sync) stored to local (only one of them is leader though) and all of them are shutting down and restarting at the same time. Which Nimbus should be a leader? How we ensure the elected Nimbus has the latest assignments? Btw, the idea brings me some thoughts: one of Storm's great point is stateless which can be achieved with stable storage (ZK for Storm), and we claims that Supervisor can work even Nimbus goes down. Now we introduced Nimbus H/A which makes Nimbus not a SPOF, and so some components like Supervisor may be able to rely on leader Nimbus instead of communicating with ZK if it is much faster and it doesn't put too much load to the leader Nimbus. (And this approach looks like same as your approach on your idea.) And since we only allow leader Nimbus to handle assignments, while we still need to write assignments to ZK, we can 'cache' it within leader Nimbus (and update cache if there's any change on assignments: note that we should also write to ZK earlier though) and avoid reading it from ZK. We still need to get heartbeat (and statistics) from ZK, but it will be handled as similar idea or another approach (like Metrics V2 in point of metrics' view)
          Hide
          danny0405 Yuzhao Chen added a comment -

          I will write the assignments into zk first every time before putting it to disk, when nimbus gains leadership, it will synchronize an copy of the zk assignments then starts to work, does this has any issue?

          Show
          danny0405 Yuzhao Chen added a comment - I will write the assignments into zk first every time before putting it to disk, when nimbus gains leadership, it will synchronize an copy of the zk assignments then starts to work, does this has any issue?
          Hide
          danny0405 Yuzhao Chen added a comment -

          Another issue, what is metrics V2 ? maybe i can make some contribution.

          Show
          danny0405 Yuzhao Chen added a comment - Another issue, what is metrics V2 ? maybe i can make some contribution.
          Hide
          kabhwan Jungtaek Lim added a comment -

          Yuzhao Chen
          Yes I think it should work. Then I don't think we should store cache to disk, because ZK is still source of truth, and when Nimbus is restarting it must read ZK other than reading from disk. Caching to memory looks sufficient.

          PR for Metrics V2 is available: https://github.com/apache/storm/pull/2203

          Show
          kabhwan Jungtaek Lim added a comment - Yuzhao Chen Yes I think it should work. Then I don't think we should store cache to disk, because ZK is still source of truth, and when Nimbus is restarting it must read ZK other than reading from disk. Caching to memory looks sufficient. PR for Metrics V2 is available: https://github.com/apache/storm/pull/2203

            People

            • Assignee:
              Unassigned
              Reporter:
              danny0405 Yuzhao Chen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 5h
                5h

                  Development