Details

    • Type: New Feature
    • Status: Closed
    • Priority: Minor
    • Resolution: Won't Fix
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
      None

      Description

      Users need a very simple way to create counters in their jobs. Accumulators provide a way to do this, but are a little clunky, for two reasons:

      1) the setup is a nuisance
      2) w/ delayed evaluation, you don't know when it will actually run, so its hard to look at the values

      consider this code:

      def filterBogus(rdd:RDD[MyCustomClass], sc: SparkContext) = {
        val filterCount = sc.accumulator(0)
        val filtered = rdd.filter{r =>
          if (isOK(r)) true else {filterCount += 1; false}
        }
        println("removed " + filterCount.value + " records)
        filtered
      }
      

      The println will always say 0 records were filtered, because its printed before anything has actually run. I could print out the value later on, but note that it would destroy the modularity of the method – kinda ugly to return the accumulator just so that it can get printed later on. (and of course, the caller in turn might not know when the filter is going to get applied, and would have to pass the accumulator up even further ...)

      I'd like to have Counters which just automatically get printed out whenever a stage has been run, and also with some api to get them back. I realize this is tricky b/c a stage can get re-computed, so maybe you should only increment the counters once.

      Maybe a more general way to do this is to provide some callback for whenever an RDD is computed – by default, you would just print the counters, but the user could replace w/ a custom handler.

        Activity

        Hide
        aash Andrew Ash added a comment -

        Hi Anonymous (not sure how you can read this but...),

        I'm not sure how a counters API could solve the delayed evaluation problem and be fundamentally different from the existing Accumulator feature. You'd have to make accessing a counter force evaluation of every RDD it's accessed from, and you'd have issues with recomputing the RDD later on when it's actually needed, causing a duplicate access.

        I can see some kind of API that offers a hook into the RDD evaluation DAG, but that operates at a stage level rather than an operation level (for example multiple maps are pipelined together) so the mapping for available hook points wouldn't be 1:1 with RDD operations, which would be quite tricky.

        I don't see a way to implement what you propose with Counters without compromising major parts of the Spark API contract (RDD laziness) so propose to close. Especially given that I haven no way to contact you given your information doesn't appear on the prior Atlassian Jira either: https://spark-project.atlassian.net/browse/SPARK-603

        Reynold Xin are you ok with closing this?

        Show
        aash Andrew Ash added a comment - Hi Anonymous (not sure how you can read this but...), I'm not sure how a counters API could solve the delayed evaluation problem and be fundamentally different from the existing Accumulator feature. You'd have to make accessing a counter force evaluation of every RDD it's accessed from, and you'd have issues with recomputing the RDD later on when it's actually needed, causing a duplicate access. I can see some kind of API that offers a hook into the RDD evaluation DAG, but that operates at a stage level rather than an operation level (for example multiple maps are pipelined together) so the mapping for available hook points wouldn't be 1:1 with RDD operations, which would be quite tricky. I don't see a way to implement what you propose with Counters without compromising major parts of the Spark API contract (RDD laziness) so propose to close. Especially given that I haven no way to contact you given your information doesn't appear on the prior Atlassian Jira either: https://spark-project.atlassian.net/browse/SPARK-603 Reynold Xin are you ok with closing this?
        Hide
        rxin Reynold Xin added a comment -

        Closing this one as part of Andrew Ash's cleanup. I think this problem is being fixed as we add accumulator / metrics values to the web ui.

        Show
        rxin Reynold Xin added a comment - Closing this one as part of Andrew Ash 's cleanup. I think this problem is being fixed as we add accumulator / metrics values to the web ui.
        Hide
        imranr Imran Rashid added a comment -

        Hey, this was originally reported by me too (probably I messed up when creating it on the old Jira, not sure if there is a way to change the reporter now?)

        I think perhaps the original issue was a little unclear, I'll try to clarify a little bit:

        I do not think we need to support something at the "operation" level – having it work at the stage level (or even job level) is fine. I'm not even sure what it would mean to work at the operation level, since individual records are pushed through all the operations of a stage in one go.

        But the operation level is still a useful abstraction for the developer. Its nice for them to be able to write methods which are eg., just a filter. For normal RDD operations, this works just fine of course – you can have a bunch of util methods that take in an RDD and output an RDD, maybe some filter, some map, etc., they can get combined however you like, everything remains lazy until there is some action. All wonderful.

        Things get messy as soon as you start to include accumulators, however – you've got include them in your return values and then the outside logic has to know when they actual contain valid data. Rather than trying to solve this problem in general, I'm proposing that we do something dead-simple for basic counters, which might even live outside of accumulators completely.

        Putting accumulator values in the web UI is not bad for just this purpose, but overall I don't think its the right solution:

        1. It limits what we can do with accumulators (see my comments on SPARK-664)
        2. The api is more complicated than it needs to be. If the only point of accumulators is counters, then we can get away with something as simple as:

        rdd.map{x =>
          if (isFoobar(x)) {
            Counters("foobar") += 1
          }
          ...
        }
        

        (eg., no need to even declare the counter up front.)

        3. Having the value in the UI is nice, but its not the same as programmatic access. eg. it can be useful to have them in the job logs, the actual values might be used in other computation (eg., gives the size of a datastructure for a later step), etc.
        Even with the simpler counter api, this is tricky b/c of lazy evaluation. But maybe that is a reason you create a call-back up front:

        Counters.addCallback("foobar"){counts => ...}
        rdd.map{x =>
          if (isFoobar(x)) {
            Counters("foobar") += 1
          }
          ...
        }
        

        4. If you have long-running tasks, it might be nice to get incremental feedback from counters during the task. (There was a real need for long-running tasks before sort-based shuffle, when you couldn't have too many tasks in a shuffle ... perhaps its not anymore, I'm not sure.)

        We can get a little further with accumulators, eg. a SparkListener could do something with accumulator values when the stages finish. But I think we're stuck on the other points. I feel like right now accumulators are trapped between just being counters, and being a more general method of computation, and not quite doing either one very well.

        Show
        imranr Imran Rashid added a comment - Hey, this was originally reported by me too (probably I messed up when creating it on the old Jira, not sure if there is a way to change the reporter now?) I think perhaps the original issue was a little unclear, I'll try to clarify a little bit: I do not think we need to support something at the "operation" level – having it work at the stage level (or even job level) is fine. I'm not even sure what it would mean to work at the operation level, since individual records are pushed through all the operations of a stage in one go. But the operation level is still a useful abstraction for the developer . Its nice for them to be able to write methods which are eg., just a filter . For normal RDD operations, this works just fine of course – you can have a bunch of util methods that take in an RDD and output an RDD, maybe some filter , some map , etc., they can get combined however you like, everything remains lazy until there is some action. All wonderful. Things get messy as soon as you start to include accumulators, however – you've got include them in your return values and then the outside logic has to know when they actual contain valid data. Rather than trying to solve this problem in general, I'm proposing that we do something dead-simple for basic counters, which might even live outside of accumulators completely. Putting accumulator values in the web UI is not bad for just this purpose, but overall I don't think its the right solution: 1. It limits what we can do with accumulators (see my comments on SPARK-664 ) 2. The api is more complicated than it needs to be. If the only point of accumulators is counters, then we can get away with something as simple as: rdd.map{x => if (isFoobar(x)) { Counters( "foobar" ) += 1 } ... } (eg., no need to even declare the counter up front.) 3. Having the value in the UI is nice, but its not the same as programmatic access. eg. it can be useful to have them in the job logs, the actual values might be used in other computation (eg., gives the size of a datastructure for a later step), etc. Even with the simpler counter api, this is tricky b/c of lazy evaluation. But maybe that is a reason you create a call-back up front: Counters.addCallback( "foobar" ){counts => ...} rdd.map{x => if (isFoobar(x)) { Counters( "foobar" ) += 1 } ... } 4. If you have long-running tasks, it might be nice to get incremental feedback from counters during the task. (There was a real need for long-running tasks before sort-based shuffle, when you couldn't have too many tasks in a shuffle ... perhaps its not anymore, I'm not sure.) We can get a little further with accumulators, eg. a SparkListener could do something with accumulator values when the stages finish. But I think we're stuck on the other points. I feel like right now accumulators are trapped between just being counters, and being a more general method of computation, and not quite doing either one very well.
        Hide
        srowen Sean Owen added a comment -

        Is this still live? I also kind of think this is accomplishable pretty easily with accumulators and adding another abstraction on top with different semantics might be more confusing than it's worth. FWIW.

        Show
        srowen Sean Owen added a comment - Is this still live? I also kind of think this is accomplishable pretty easily with accumulators and adding another abstraction on top with different semantics might be more confusing than it's worth. FWIW.
        Hide
        irashid Imran Rashid added a comment -

        Hi Sean Owen

        I don't think anyone is actively working on this, and probably won't for a while – I suppose that means it should be closed for now.

        I disagree that its easy to do this with accumulators. Its certainly possible, but it makes it quite complicated to do something that is use very common and should be dead-simple. (or at least, its harder than most people realize to use accumulators to do this correctly.) i guess it will be confusing to have counters & accumulators in the api, but it might only serve to highlight some of the intricacies of the accumulator api which aren't obvious (and can't be fixed w/out breaking changes).

        Show
        irashid Imran Rashid added a comment - Hi Sean Owen I don't think anyone is actively working on this, and probably won't for a while – I suppose that means it should be closed for now. I disagree that its easy to do this with accumulators. Its certainly possible, but it makes it quite complicated to do something that is use very common and should be dead-simple. (or at least, its harder than most people realize to use accumulators to do this correctly .) i guess it will be confusing to have counters & accumulators in the api, but it might only serve to highlight some of the intricacies of the accumulator api which aren't obvious (and can't be fixed w/out breaking changes).
        Hide
        koert koert kuipers added a comment -

        we use counters a lot in scalding (to verify records counts mostly at different stages, for certain criteria).

        i do not think it is easy at all to recreate counters with accumulators. in fact with the current behavior of accumulators (they do not account for task failure, leading to double counting) i think its nearly impossible to implement counters.

        Show
        koert koert kuipers added a comment - we use counters a lot in scalding (to verify records counts mostly at different stages, for certain criteria). i do not think it is easy at all to recreate counters with accumulators. in fact with the current behavior of accumulators (they do not account for task failure, leading to double counting) i think its nearly impossible to implement counters.

          People

          • Assignee:
            Unassigned
            Reporter:
            imranr Imran Rashid
          • Votes:
            0 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development