Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.2.0
    • None

    Description

      This is a proposal to provide a new metrics reporting API based on Coda Hale's metrics library (AKA Dropwizard/Yammer metrics).

      Background

      In a discussion on the dev@ mailing list a number of community and PMC members recommended replacing Storm’s metrics system with a new API as opposed to enhancing the existing metrics system. Some of the objections to the existing metrics API include:

      1. Metrics are reported as an untyped Java object, making it very difficult to reason about how to report it (e.g. is it a gauge, a counter, etc.?)
      2. It is difficult to determine if metrics coming into the consumer are pre-aggregated or not.
      3. Storm’s metrics collection occurs through a specialized bolt, which in addition to potentially affecting system performance, complicates certain types of aggregation when the parallelism of that bolt is greater than one.

      In the discussion on the developer mailing list, there is growing consensus for replacing Storm’s metrics API with a new API based on Coda Hale’s metrics library. This approach has the following benefits:

      1. Coda Hale’s metrics library is very stable, performant, well thought out, and widely adopted among open source projects (e.g. Kafka).
      2. The metrics library provides many existing metric types: Meters, Gauges, Counters, Histograms, and more.
      3. The library has a pluggable “reporter” API for publishing metrics to various systems, with existing implementations for: JMX, console, CSV, SLF4J, Graphite, Ganglia.
      4. Reporters are straightforward to implement, and can be reused by any project that uses the metrics library (i.e. would have broader application outside of Storm)

      As noted earlier, the metrics library supports pluggable reporters for sending metrics data to other systems, and implementing a reporter is fairly straightforward (an example reporter implementation can be found here). For example if someone develops a reporter based on Coda Hale’s metrics, it could not only be used for pushing Storm metrics, but also for any system that used the metrics library, such as Kafka.

      Scope of Effort

      The effort to implement a new metrics API for Storm can be broken down into the following development areas:

      1. Implement API for Storms internal worker metrics: latencies, queue sizes, capacity, etc.
      2. Implement API for user defined, topology-specific metrics (exposed via the org.apache.storm.task.TopologyContext class)
      3. Implement API for storm daemons: nimbus, supervisor, etc.

      Relationship to Existing Metrics

      This would be a new API that would not affect the existing metrics API. Upon completion, the old metrics API would presumably be deprecated, but kept in place for backward compatibility.

      Internally the current metrics API uses Storm bolts for the reporting mechanism. The proposed metrics API would not depend on any of Storm's messaging capabilities and instead use the metrics library's built-in reporter mechanism . This would allow users to use existing Reporter implementations which are not Storm-specific, and would simplify the process of collecting metrics. Compared to Storm's IMetricCollector interface, implementing a reporter for the metrics library is much more straightforward (an example can be found here .

      The new metrics capability would not use or affect the ZooKeeper-based metrics used by Storm UI.

      Relationship to JStorm Metrics

      [TBD]

      Target Branches

      [TBD]

      Performance Implications

      [TBD]

      Metrics Namespaces

      [TBD]

      Metrics Collected

      Worker

      Namespace Metric Type Description

      Nimbus

      Namespace Metric Type Description

      Supervisor

      Namespace Metric Type Description

      User-Defined Metrics

      [TBD]

      Attachments

        Issue Links

          Activity

            ptgoetz,

            Just a small correction here: "The proposed metrics API would depend on any of Storm's messaging capabilities...": I think you mean "would NOT depend".

            I am not sure codahale answers one part that I am unclear about, and that is how workers will send their metrics over. If they interact with codahale directly, or whether there is some collection/aggregation in the middle.

            I'll read more on how Kafka uses it to see if that helps.

            abellina Alessandro Bellina added a comment - ptgoetz , Just a small correction here: "The proposed metrics API would depend on any of Storm's messaging capabilities...": I think you mean "would NOT depend". I am not sure codahale answers one part that I am unclear about, and that is how workers will send their metrics over. If they interact with codahale directly, or whether there is some collection/aggregation in the middle. I'll read more on how Kafka uses it to see if that helps.

            I've been looking some at JStorm's approach.

            They are using wrappers of codahale meters (e.g. AsmGauge, AsmCounter, etc) and that's what they use at all levels of the hierarchy (worker, topology, cluster, etc.). Both built in and user metrics use these.

            Then they have their own implementation of a registry, but it doesn't seem to be related to the codahale MetricRegistry. They send metrics via netty or thrift by querying their registry. If from worker, they send via netty to topology master. If from topology master, they send to Nimbus via thrift.

            From thrift it goes to the rocksdb cache (there's an interface around this). I don't see a codahale-based reporter here.

            That's where my current question is. Do we think we need to have some step between workers and centralized collection s.t. reporters can live at a higher level (one reporter per cluster for example)? Or, do we want to report from each worker (not sure how this would work)?

            abellina Alessandro Bellina added a comment - I've been looking some at JStorm's approach. They are using wrappers of codahale meters (e.g. AsmGauge, AsmCounter, etc) and that's what they use at all levels of the hierarchy (worker, topology, cluster, etc.). Both built in and user metrics use these. Then they have their own implementation of a registry, but it doesn't seem to be related to the codahale MetricRegistry. They send metrics via netty or thrift by querying their registry. If from worker, they send via netty to topology master. If from topology master, they send to Nimbus via thrift. From thrift it goes to the rocksdb cache (there's an interface around this). I don't see a codahale-based reporter here. That's where my current question is. Do we think we need to have some step between workers and centralized collection s.t. reporters can live at a higher level (one reporter per cluster for example)? Or, do we want to report from each worker (not sure how this would work)?
            kabhwan Jungtaek Lim added a comment - - edited

            If worker reports metrics directly there is going to be no room for aggregation. That is fine for external time-series DB (since they are able to aggregate before showing), but sometimes they're still too many metrics points which slow down aggregation.

            Btw, I don't intend to put more pressures to anyone, but only addressing reporter doesn't resolve the issues behind current metrics.
            I really would like to emphasize that there're design issues on metrics, not just API itself. I just waited JStorm merger phase 2 to address this, but this will not be going to happen even this year, so would just shout out again.

            Please refer below links:

            While I don't think we should address whole kinds of wishlist, I strongly think we should get rid of current limitations of metrics if we really want to touch the thing.
            And JStorm approach seems to be able to address all kinds of current limitations. Centralize (to Nimbus) + internal fast storage (RocksDB) is the key, and TopologyMaster sends workers metrics to Nimbus efficiently. (TM seems not aggregate the value of metric itself, but maximize the throughput between TM and Nimbus)

            I haven't look at metrics system on Kafka Streams and Flink (they seemed to renew the metrics) but having a look should help to set direction right.

            kabhwan Jungtaek Lim added a comment - - edited If worker reports metrics directly there is going to be no room for aggregation. That is fine for external time-series DB (since they are able to aggregate before showing), but sometimes they're still too many metrics points which slow down aggregation. Btw, I don't intend to put more pressures to anyone, but only addressing reporter doesn't resolve the issues behind current metrics. I really would like to emphasize that there're design issues on metrics, not just API itself. I just waited JStorm merger phase 2 to address this, but this will not be going to happen even this year, so would just shout out again. Please refer below links: https://cwiki.apache.org/confluence/display/STORM/Limitations+of+current+metrics+feature https://cwiki.apache.org/confluence/display/STORM/Wishlist+for+new+metrics+feature While I don't think we should address whole kinds of wishlist, I strongly think we should get rid of current limitations of metrics if we really want to touch the thing. And JStorm approach seems to be able to address all kinds of current limitations. Centralize (to Nimbus) + internal fast storage (RocksDB) is the key, and TopologyMaster sends workers metrics to Nimbus efficiently. (TM seems not aggregate the value of metric itself, but maximize the throughput between TM and Nimbus) I haven't look at metrics system on Kafka Streams and Flink (they seemed to renew the metrics) but having a look should help to set direction right.

            I agree with all that has been said here. There really are several smaller pieces of this project that each need to be addressed separately.

            1) End User API
            2) Reporting API
            3) Default reporting implementation
            4) Query API
            5) Default Query Implementation
            6) UI Updates

            Most of these pieces can be worked on separately and some what independently.

            We all seem to agree on 1 and 2 being stock Codahale. I know others have moved away form Codahale in the past, but if we run into issues I would rather try to work them out with the Codahale community rather then go it on our own.

            For 3 The default reporting implementation I think the simplest approach to start out with is to have a default reporter periodically write metrics to the local file system, and have the supervisor pick them up and report them through thrift to nimbus. This allows us to not have to worry about security. The supervisor will be able to authenticate with nimbus no problem as it already does.

            For 5 I think using the JStorm rocksdb implementation as a starting point is great, and others seem to agree.

            4 and 6 are things that we have not really addressed here. We probably should look at others and see what they are doing here. and possibly copy a striped down version of OpenTSDB or Druid as an initial starting point.

            My proposal would be to file separate JIRA for each of these pieces. Many of these pieces can provide values without the others fully in place. Having a new metrics/reporter API based on Codahale that is parallel to the IMetricsConsumer we have right now would be a good start. It would fix a lot of the issues with IMetricsConsumer but we wouldn't have to tie it into an internal reporting system yet. We could even implement it in 1.x and deprecate the older API there as well.

            Having a time series database that all it does is store metrics that are currently reported through ZK would be a great step too. It would not be prefect, but at least we could have a history for the metrics and they would not reset every time a worker crashes.

            Once those two are in place we can glue the different pieces together.

            It feels like 3 phases each independent and each fairly manageable.

            revans2 Robert Joseph Evans added a comment - I agree with all that has been said here. There really are several smaller pieces of this project that each need to be addressed separately. 1) End User API 2) Reporting API 3) Default reporting implementation 4) Query API 5) Default Query Implementation 6) UI Updates Most of these pieces can be worked on separately and some what independently. We all seem to agree on 1 and 2 being stock Codahale. I know others have moved away form Codahale in the past, but if we run into issues I would rather try to work them out with the Codahale community rather then go it on our own. For 3 The default reporting implementation I think the simplest approach to start out with is to have a default reporter periodically write metrics to the local file system, and have the supervisor pick them up and report them through thrift to nimbus. This allows us to not have to worry about security. The supervisor will be able to authenticate with nimbus no problem as it already does. For 5 I think using the JStorm rocksdb implementation as a starting point is great, and others seem to agree. 4 and 6 are things that we have not really addressed here. We probably should look at others and see what they are doing here. and possibly copy a striped down version of OpenTSDB or Druid as an initial starting point. My proposal would be to file separate JIRA for each of these pieces. Many of these pieces can provide values without the others fully in place. Having a new metrics/reporter API based on Codahale that is parallel to the IMetricsConsumer we have right now would be a good start. It would fix a lot of the issues with IMetricsConsumer but we wouldn't have to tie it into an internal reporting system yet. We could even implement it in 1.x and deprecate the older API there as well. Having a time series database that all it does is store metrics that are currently reported through ZK would be a great step too. It would not be prefect, but at least we could have a history for the metrics and they would not reset every time a worker crashes. Once those two are in place we can glue the different pieces together. It feels like 3 phases each independent and each fairly manageable.

            Thanks for the correction.

            Components that generate metrics would update an instance of a metric. So for example say we had a Counter for something like tuples processed. In the worker, we would call counter.inc(). That would not cause the metric to be published anywhere, it would just update the state of that metric instance.

            Reporters run on a independent thread, usually running at a configurable interval like one minute, and publish the current state of all the registered metrics. That decouples reporting (the expensive part) from metrics updates (the inexpensive part), minimizing the impact to the performance-critical path.

            ptgoetz P. Taylor Goetz added a comment - Thanks for the correction. Components that generate metrics would update an instance of a metric. So for example say we had a Counter for something like tuples processed. In the worker, we would call counter.inc() . That would not cause the metric to be published anywhere, it would just update the state of that metric instance. Reporters run on a independent thread, usually running at a configurable interval like one minute, and publish the current state of all the registered metrics. That decouples reporting (the expensive part) from metrics updates (the inexpensive part), minimizing the impact to the performance-critical path.

            One thing that I would probably do differently from JStorm's approach is not to use wrappers around metrics but rather expose them directly.

            I was hoping to avoid have some sort of centralized "metrics collector" service, etc., but the more I think it may be necessary, particularly wrt aggregated metrics. From a protocol perspective, I'd lean more towards netty than Thrift.

            ptgoetz P. Taylor Goetz added a comment - One thing that I would probably do differently from JStorm's approach is not to use wrappers around metrics but rather expose them directly. I was hoping to avoid have some sort of centralized "metrics collector" service, etc., but the more I think it may be necessary, particularly wrt aggregated metrics. From a protocol perspective, I'd lean more towards netty than Thrift.

            Makes sense to me. For point 3 specifically, we'd create a custom codahale reporter @worker level that writes to disk. From that moment on it is thrift to Nimbus to out-of-the-box store (RocksDB).

            abellina Alessandro Bellina added a comment - Makes sense to me. For point 3 specifically, we'd create a custom codahale reporter @worker level that writes to disk. From that moment on it is thrift to Nimbus to out-of-the-box store (RocksDB).

            And to address kabhwan's concern about aggregation. We can over time add in aggregation at the supervisor level, and then also aggregation in the DB itself. Most TSDB systems act like an OLAP cube and pre-aggregate within the DB on common dimensions (topology, host, etc...). We could add that in too without too much effort.

            revans2 Robert Joseph Evans added a comment - And to address kabhwan 's concern about aggregation. We can over time add in aggregation at the supervisor level, and then also aggregation in the DB itself. Most TSDB systems act like an OLAP cube and pre-aggregate within the DB on common dimensions (topology, host, etc...). We could add that in too without too much effort.

            Copying your wish list here for easy reference:

            1. Aggregation at component level (Average, Sum etc)

            As I mentioned in an earlier comment, this is likely going to require something centralized in order to aggregate cross-worker metrics. We'll need more discussion to work this out.

            2. Blacklist/whitelist

            Metrics supports the concept of filtering: https://github.com/dropwizard/metrics/blob/3.2-development/metrics-core/src/main/java/com/codahale/metrics/MetricFilter.java

            3. Allow only numbers for values

            All the metric types in the library are pretty well typed to only number values. Gauges support non-number types, but I don't see us using many non-number types.

            4. Efficient routing of built-in metrics to UI (current they get tagged

            along with executor heartbeat which puts pressure on zookeeper)

            I feel UI integration is out of scope for the time being, but can be a follow on effort.

            5. Worker/JVM level metrics which are not owned by a particular component

            Metrics JVM support: http://metrics.dropwizard.io/3.1.0/manual/jvm/

            6. Percentiles for latency metrics such as p99, p95 etc

            This is handled automatically for a number of metrics: https://github.com/dropwizard/metrics/blob/3.2-development/metrics-core/src/main/java/com/codahale/metrics/Snapshot.java

            7. Aggregation at stream level, and machine level

            This goes back to point #1. We'll have to plan carefully how we support aggregation.

            8. way to subscribe cluster metrics

            I think this is covered by the metrics reporter mechanism.

            9. counter stats as non-sampled if it doesn't hurt performance

            We should be able to determine that with some micro-benchmarking.

            10. more metrics like serialization/deserialization latency, queue status

            Agreed. It's actually very easy to add new metrics. We do however want to be careful to not over-instrument and affect performance.

            11. Dynamically turning on/off specific metrics

            That I haven't thought about too much, but we could probably do something like a dynamic filter (see #2).

            ptgoetz P. Taylor Goetz added a comment - Copying your wish list here for easy reference: 1. Aggregation at component level (Average, Sum etc) As I mentioned in an earlier comment, this is likely going to require something centralized in order to aggregate cross-worker metrics. We'll need more discussion to work this out. 2. Blacklist/whitelist Metrics supports the concept of filtering: https://github.com/dropwizard/metrics/blob/3.2-development/metrics-core/src/main/java/com/codahale/metrics/MetricFilter.java 3. Allow only numbers for values All the metric types in the library are pretty well typed to only number values. Gauges support non-number types, but I don't see us using many non-number types. 4. Efficient routing of built-in metrics to UI (current they get tagged along with executor heartbeat which puts pressure on zookeeper) I feel UI integration is out of scope for the time being, but can be a follow on effort. 5. Worker/JVM level metrics which are not owned by a particular component Metrics JVM support: http://metrics.dropwizard.io/3.1.0/manual/jvm/ 6. Percentiles for latency metrics such as p99, p95 etc This is handled automatically for a number of metrics: https://github.com/dropwizard/metrics/blob/3.2-development/metrics-core/src/main/java/com/codahale/metrics/Snapshot.java 7. Aggregation at stream level, and machine level This goes back to point #1. We'll have to plan carefully how we support aggregation. 8. way to subscribe cluster metrics I think this is covered by the metrics reporter mechanism. 9. counter stats as non-sampled if it doesn't hurt performance We should be able to determine that with some micro-benchmarking. 10. more metrics like serialization/deserialization latency, queue status Agreed. It's actually very easy to add new metrics. We do however want to be careful to not over-instrument and affect performance. 11. Dynamically turning on/off specific metrics That I haven't thought about too much, but we could probably do something like a dynamic filter (see #2).

            There really are several smaller pieces of this project that each need to be addressed separately.

            I'd say lets make the broken-down taks subtasks of this JIRA.

            ptgoetz P. Taylor Goetz added a comment - There really are several smaller pieces of this project that each need to be addressed separately. I'd say lets make the broken-down taks subtasks of this JIRA.

            One concern I have with having nimbus as the destination of aggregated metrics, is nimbus HA. If a new nimbus becomes leader, presumably all metrics will be lost or reset to stale values, unless there is some sort of replication.

            Would a "metrics store" need to be HA? I would probably argue that it isn't, provided the reporters handle a metrics store outage gracefully without affecting performance (since the reporter threads are not on the critical path this wouldn't be too hard).

            I also like the idea of doing the aggregation at the "metrics store" level. That would simplify things since all metrics reporters could push values to that store without doing any intermediate aggregation.

            (I'm using scare quotes around "metrics store" because it is starting to sound like we are talking about a new service/daemon.)

            ptgoetz P. Taylor Goetz added a comment - One concern I have with having nimbus as the destination of aggregated metrics, is nimbus HA. If a new nimbus becomes leader, presumably all metrics will be lost or reset to stale values, unless there is some sort of replication. Would a "metrics store" need to be HA? I would probably argue that it isn't, provided the reporters handle a metrics store outage gracefully without affecting performance (since the reporter threads are not on the critical path this wouldn't be too hard). I also like the idea of doing the aggregation at the "metrics store" level. That would simplify things since all metrics reporters could push values to that store without doing any intermediate aggregation. (I'm using scare quotes around "metrics store" because it is starting to sound like we are talking about a new service/daemon.)

            I agree that we eventually want the metrics to be HA. It should be fairly simple to set up a periodic sync to the other nimbus instances, (and I think rocks DB supports versioning so we might be able to get a lot of it for free. In the worst case we would lose some metrics which I don't see as being that critical.

            I would say lets start out with no HA for metrics and then add it in afterwards. It is no worse then what we have now where we can lose metrics when a worker goes down.

            revans2 Robert Joseph Evans added a comment - I agree that we eventually want the metrics to be HA. It should be fairly simple to set up a periodic sync to the other nimbus instances, (and I think rocks DB supports versioning so we might be able to get a lot of it for free. In the worst case we would lose some metrics which I don't see as being that critical. I would say lets start out with no HA for metrics and then add it in afterwards. It is no worse then what we have now where we can lose metrics when a worker goes down.

            I added JIRAs for parts I thought were somewhat independent and agreed upon.

            I think if the interface to RocksDB is architected correctly, we could eventually spawn that off to its own process (hopefully without changing the world) if we want to have a common store shared by the Nimbus cluster.

            abellina Alessandro Bellina added a comment - I added JIRAs for parts I thought were somewhat independent and agreed upon. I think if the interface to RocksDB is architected correctly, we could eventually spawn that off to its own process (hopefully without changing the world) if we want to have a common store shared by the Nimbus cluster.

            I am fine with writing it do it has it's own thrift port/etc. But I don't really want it as a separate process unless we have a very good reason to do so. Having more things to launch and manage cause a lot of headaches for people running the cluster.

            revans2 Robert Joseph Evans added a comment - I am fine with writing it do it has it's own thrift port/etc. But I don't really want it as a separate process unless we have a very good reason to do so. Having more things to launch and manage cause a lot of headaches for people running the cluster.
            kabhwan Jungtaek Lim added a comment -

            AFAIK, other streaming frameworks chose RocksDB because of embeddable characteristic. If RocksDB requires launching another process manually, they should have to choose another solution. Let's not think about H/A. If we don't have a great idea till then, we could even choose aggressive approach, sending metrics to all of Nimbuses including standby.

            kabhwan Jungtaek Lim added a comment - AFAIK, other streaming frameworks chose RocksDB because of embeddable characteristic. If RocksDB requires launching another process manually, they should have to choose another solution. Let's not think about H/A. If we don't have a great idea till then, we could even choose aggressive approach, sending metrics to all of Nimbuses including standby.

            What if we initially couple it with the Storm UI process and try to keep it contained enough so it would be easy to move it to the Nimbus process when HA features are implemented?

            ptgoetz P. Taylor Goetz added a comment - What if we initially couple it with the Storm UI process and try to keep it contained enough so it would be easy to move it to the Nimbus process when HA features are implemented?

            I would rather keep it in nimbus. I know the primary use case is for the UI, but I also want to be abel to query it from nimbus so the scheduler can use the metrics for elasticity. Architecturally and from a security standpoint I would rather keep it with nimbus. The UI could be running on a separate box from nimbus that is more open in some ways and less open in others. I could easily see a setup where the UI is not directly accessible from the worker nodes. But I really don't care where it is so long as we have it be separately configurable by both host and port.

            revans2 Robert Joseph Evans added a comment - I would rather keep it in nimbus. I know the primary use case is for the UI, but I also want to be abel to query it from nimbus so the scheduler can use the metrics for elasticity. Architecturally and from a security standpoint I would rather keep it with nimbus. The UI could be running on a separate box from nimbus that is more open in some ways and less open in others. I could easily see a setup where the UI is not directly accessible from the worker nodes. But I really don't care where it is so long as we have it be separately configurable by both host and port.

            One of the things we're going to have to work out is how to properly aggregate different metric types, if we can at all. For simple metric types like Counters, it is straightforward. But things get more complicated with metrics like Meters and Histograms – there's no way to really aggregate percentiles.

            The problem is that metrics maintain a certain amount of state necessary to calculate certain values like percentiles. When a reporter reports those values, it takes a point-in-time snapshot of those calculations, so all the data used to calculate those values is no longer available.

            As an example lets say we want to have a Histogram for execute latency (so we can report various percentiles), and we want to aggregate those statistics across a topology. So worker A and worker B report their histogram snapshot to nimbus. How does nimbus aggregate those values without the datasets the calculations were based on?

            ptgoetz P. Taylor Goetz added a comment - One of the things we're going to have to work out is how to properly aggregate different metric types, if we can at all. For simple metric types like Counters, it is straightforward. But things get more complicated with metrics like Meters and Histograms – there's no way to really aggregate percentiles. The problem is that metrics maintain a certain amount of state necessary to calculate certain values like percentiles. When a reporter reports those values, it takes a point-in-time snapshot of those calculations, so all the data used to calculate those values is no longer available. As an example lets say we want to have a Histogram for execute latency (so we can report various percentiles), and we want to aggregate those statistics across a topology. So worker A and worker B report their histogram snapshot to nimbus. How does nimbus aggregate those values without the datasets the calculations were based on?

            I have not dug into the internals of how dropwizard represents a histogram. But all of the histogram implementations that I know of allow for simple aggregations over time. They usually play games with how to reduce the size by bucketizing values. Look at HDR histogram and how it lets you merge multiple sub-histograms together to pull out percentiles after they are merged. Meters typically use different aggregations from counters, because a meter is a set value, I don't have to worry about how much it has changed over time. Counters can roll over/reset and detecting which one happened can sometimes be difficult to get right.

            revans2 Robert Joseph Evans added a comment - I have not dug into the internals of how dropwizard represents a histogram. But all of the histogram implementations that I know of allow for simple aggregations over time. They usually play games with how to reduce the size by bucketizing values. Look at HDR histogram and how it lets you merge multiple sub-histograms together to pull out percentiles after they are merged. Meters typically use different aggregations from counters, because a meter is a set value, I don't have to worry about how much it has changed over time. Counters can roll over/reset and detecting which one happened can sometimes be difficult to get right.

            From the GraphiteReported in codahale, they just send a snapshot of each of of the percentiles for each Histogram, so that isn't something users should aggregate on. See reportHistogram in: http://grepcode.com/file/repo1.maven.org/maven2/com.codahale.metrics/metrics-graphite/3.0.2/com/codahale/metrics/graphite/GraphiteReporter.java#GraphiteReporter.report%28java.util.SortedMap%2Cjava.util.SortedMap%2Cjava.util.SortedMap%2Cjava.util.SortedMap%2Cjava.util.SortedMap%29

            For the storm-facing metrics, we would figure out a way to roll our own histogram that sends raw values s.t. we can rollup first, then compute the statistic.

            abellina Alessandro Bellina added a comment - From the GraphiteReported in codahale, they just send a snapshot of each of of the percentiles for each Histogram, so that isn't something users should aggregate on. See reportHistogram in: http://grepcode.com/file/repo1.maven.org/maven2/com.codahale.metrics/metrics-graphite/3.0.2/com/codahale/metrics/graphite/GraphiteReporter.java#GraphiteReporter.report%28java.util.SortedMap%2Cjava.util.SortedMap%2Cjava.util.SortedMap%2Cjava.util.SortedMap%2Cjava.util.SortedMap%29 For the storm-facing metrics, we would figure out a way to roll our own histogram that sends raw values s.t. we can rollup first, then compute the statistic.

            ptgoetz I added a diagram of what I am working on now here: https://github.com/abellina/storm/blob/new_metrics_diagram/docs/new_metrics_phase_1.png

            This is a bit different than previous discussion but here is how it works, let me know what you think (this is separate than reporter configuration, it is just the built in reporter), and it is a phased approach. Current phase would put still pipe metrics through StatsUtil, although I haven't written that part yet (they get to nimbus right now only).

            1. Each executor registers metrics against the metric registry running in the worker.
            2. With the default reporter configured, we'd instantiate that. This is a codahale ScheduledReporter that runs once/min? Currently I go every 5 seconds but that's just for testing.
            3. The stats are then written to disk on a per component basis, exactly like versioned store. We could swap this part with a better way to store, but that's what I have so far. I call it TimeseriesStore to distinguish, but we could merge the two.
            4. The supervisor has a timer, I call it WorkerStatsTimer that picks up stats from the stats directory on disk using its instance of TimeseriesStore. This is very much like the heartbeat stuff in the supervisor, except the Supervisor itself doesn't care much for them, it just shuttles them to the thrift connection. The stats are then deleted when pushed, nothing is kept.
            5. With each iteration of WorkerStatsTimer, we send the metrics for all workers to Nimbus via Thrift.
            6. For phase I, I was just thinking to take these and make them look like heartbeats stats s.t. the stats code and the UI can show them. Eventually, once the RocksDB stuff is ready we can store there.
            7. I am looking to publish all data available for Timers and Histograms to see if I can reconstruct later the stats it computes, so that we can look at how to aggregate them.

            Thoughts? I am going to take a crack at trying to make this work with our current UI this week, and I should be able to share the code next week. I have to finish up what I doing, write tests and document/clean up quite a bit, but can share before.

            abellina Alessandro Bellina added a comment - ptgoetz I added a diagram of what I am working on now here: https://github.com/abellina/storm/blob/new_metrics_diagram/docs/new_metrics_phase_1.png This is a bit different than previous discussion but here is how it works, let me know what you think (this is separate than reporter configuration, it is just the built in reporter), and it is a phased approach. Current phase would put still pipe metrics through StatsUtil, although I haven't written that part yet (they get to nimbus right now only). 1. Each executor registers metrics against the metric registry running in the worker. 2. With the default reporter configured, we'd instantiate that. This is a codahale ScheduledReporter that runs once/min? Currently I go every 5 seconds but that's just for testing. 3. The stats are then written to disk on a per component basis, exactly like versioned store. We could swap this part with a better way to store, but that's what I have so far. I call it TimeseriesStore to distinguish, but we could merge the two. 4. The supervisor has a timer, I call it WorkerStatsTimer that picks up stats from the stats directory on disk using its instance of TimeseriesStore. This is very much like the heartbeat stuff in the supervisor, except the Supervisor itself doesn't care much for them, it just shuttles them to the thrift connection. The stats are then deleted when pushed, nothing is kept. 5. With each iteration of WorkerStatsTimer, we send the metrics for all workers to Nimbus via Thrift. 6. For phase I, I was just thinking to take these and make them look like heartbeats stats s.t. the stats code and the UI can show them. Eventually, once the RocksDB stuff is ready we can store there. 7. I am looking to publish all data available for Timers and Histograms to see if I can reconstruct later the stats it computes, so that we can look at how to aggregate them. Thoughts? I am going to take a crack at trying to make this work with our current UI this week, and I should be able to share the code next week. I have to finish up what I doing, write tests and document/clean up quite a bit, but can share before.

            Feature branch can be found here:

            https://github.com/apache/storm/tree/metrics_v2

            ptgoetz P. Taylor Goetz added a comment - Feature branch can be found here: https://github.com/apache/storm/tree/metrics_v2
            srishtyagrawal9@gmail.com Srishty Agrawal added a comment - - edited

            We are planning to upgrade the Storm version from v0.9.6 to v1.1+ and were wondering if there are chances of new metrics framework being backported to v1.1.x in the future?

            srishtyagrawal9@gmail.com Srishty Agrawal added a comment - - edited We are planning to upgrade the Storm version from v0.9.6 to v1.1+ and were wondering if there are chances of new metrics framework being backported to v1.1.x in the future?
            kabhwan Jungtaek Lim added a comment -

            I'm expecting current patch will be finally reviewed and merged in early Jan so adding this to epic for Storm release 1.2.0, but if we can't make it in time, we might want to move out of.

            kabhwan Jungtaek Lim added a comment - I'm expecting current patch will be finally reviewed and merged in early Jan so adding this to epic for Storm release 1.2.0, but if we can't make it in time, we might want to move out of.
            kabhwan Jungtaek Lim added a comment -

            srishtyagrawal9@gmail.com
            I'm sorry I missed the comment. New feature is unlikely to be ported back to bugfix release, so it is merely no chance for this to be included to 1.1.x. We will be focusing to move toward to 2.0.0 after releasing 1.2.0 and then 1.x version will be going to be mainternance mode (most likely allowing only bugfixes) afterwards.

            kabhwan Jungtaek Lim added a comment - srishtyagrawal9@gmail.com I'm sorry I missed the comment. New feature is unlikely to be ported back to bugfix release, so it is merely no chance for this to be included to 1.1.x. We will be focusing to move toward to 2.0.0 after releasing 1.2.0 and then 1.x version will be going to be mainternance mode (most likely allowing only bugfixes) afterwards.
            kabhwan Jungtaek Lim added a comment -

            Cloning issue for 2.0.0 and resolving this one.

            kabhwan Jungtaek Lim added a comment - Cloning issue for 2.0.0 and resolving this one.

            People

              ptgoetz P. Taylor Goetz
              ptgoetz P. Taylor Goetz
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 34h 40m
                  34h 40m