Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 2.0.0
    • Component/s: SparkR
    • Labels:
      None

      Description

      gapply() applies an R function on groups grouped by one or more columns of a DataFrame, and returns a DataFrame. It is like GroupedDataSet.flatMapGroups() in the Dataset API.

      Two API styles are supported:
      1.

      gd <- groupBy(df, col1, ...)
      gapply(gd, function(grouping_key, group) {}, schema)
      

      2.

      gapply(df, grouping_columns, function(grouping_key, group) {}, schema) 
      

      R function input: grouping keys value, a local data.frame of this grouped data
      R function output: local data.frame

      Schema specifies the Row format of the output of the R function. It must match the R function's output.

      Note that map-side combination (partial aggregation) is not supported, user could do map-side combination via dapply().

        Issue Links

          Activity

          Hide
          Narine Narine Kokhlikyan added a comment - - edited

          thanks, for creating this jira, Sun Rui
          Have you already started to work on this ? This most probably depends on, https://issues.apache.org/jira/browse/SPARK-12792.
          We need this as soon as possible and I might start working on this.
          Do you have any time estimation how long will it take to get https://issues.apache.org/jira/browse/SPARK-12792 reviewed ?

          cc: Shivaram Venkataraman

          Thanks,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - - edited thanks, for creating this jira, Sun Rui Have you already started to work on this ? This most probably depends on, https://issues.apache.org/jira/browse/SPARK-12792 . We need this as soon as possible and I might start working on this. Do you have any time estimation how long will it take to get https://issues.apache.org/jira/browse/SPARK-12792 reviewed ? cc: Shivaram Venkataraman Thanks, Narine
          Hide
          sunrui Sun Rui added a comment - - edited

          Narine Kokhlikyan, yes this depends on https://issues.apache.org/jira/browse/SPARK-12792. I will do dapply() and you can feel free to work on this one by creating a working branch based on the PR for SPARK-12792. Could you review the implementation design doc before start doing?

          Shivaram Venkataraman Could you help to review the PR for SPARK-12792 and merge it ASAP. We might to get SparkR UDF done in Spark 2.0.

          Show
          sunrui Sun Rui added a comment - - edited Narine Kokhlikyan , yes this depends on https://issues.apache.org/jira/browse/SPARK-12792 . I will do dapply() and you can feel free to work on this one by creating a working branch based on the PR for SPARK-12792 . Could you review the implementation design doc before start doing? Shivaram Venkataraman Could you help to review the PR for SPARK-12792 and merge it ASAP. We might to get SparkR UDF done in Spark 2.0.
          Hide
          Narine Narine Kokhlikyan added a comment -

          Thanks for your quick response Sun Rui, I'll try to review it in detail.

          Show
          Narine Narine Kokhlikyan added a comment - Thanks for your quick response Sun Rui , I'll try to review it in detail.
          Hide
          Narine Narine Kokhlikyan added a comment - - edited

          Hi Sun Rui,

          I looked at the implementation proposal and it looks good to me. But, I think it would be good to add some details about the aggregation of the data/dataframes which we receive from workers.

          I've tried to draw a diagram, for the example of group-apply in order to understand the bigger picture.
          https://docs.google.com/document/d/1z-sghU8wYKW-oNOajzFH02X0CP9Vd67cuJ085e93vZ8/edit
          Please, let me know if I've understood smth wrongly ?

          Thanks,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - - edited Hi Sun Rui , I looked at the implementation proposal and it looks good to me. But, I think it would be good to add some details about the aggregation of the data/dataframes which we receive from workers. I've tried to draw a diagram, for the example of group-apply in order to understand the bigger picture. https://docs.google.com/document/d/1z-sghU8wYKW-oNOajzFH02X0CP9Vd67cuJ085e93vZ8/edit Please, let me know if I've understood smth wrongly ? Thanks, Narine
          Hide
          Narine Narine Kokhlikyan added a comment -

          Started working on this!

          Show
          Narine Narine Kokhlikyan added a comment - Started working on this!
          Hide
          sunrui Sun Rui added a comment -

          cool

          Show
          sunrui Sun Rui added a comment - cool
          Hide
          Narine Narine Kokhlikyan added a comment - - edited

          Hi Sun Rui,

          I have a question regarding your suggestion about adding a new "GroupedData.flatMapRGroups" function according to the following document:
          https://docs.google.com/presentation/d/1oj17N5JaE8JDjT2as_DUI6LKutLcEHNZB29HsRGL_dM/edit#slide=id.p9

          It seems that some changes have happened in SparkSQL. According to 1.6.1 there was a scala class called:
          https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala

          This doesn't seem to exist in 2.0.0

          I was thinking to add the flatMapRGroups helper function to org.apache.spark.sql.KeyValueGroupedDataset or org.apache.spark.sql.RelationalGroupedDataset. What do you think ?

          Thank you,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - - edited Hi Sun Rui , I have a question regarding your suggestion about adding a new "GroupedData.flatMapRGroups" function according to the following document: https://docs.google.com/presentation/d/1oj17N5JaE8JDjT2as_DUI6LKutLcEHNZB29HsRGL_dM/edit#slide=id.p9 It seems that some changes have happened in SparkSQL. According to 1.6.1 there was a scala class called: https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala This doesn't seem to exist in 2.0.0 I was thinking to add the flatMapRGroups helper function to org.apache.spark.sql.KeyValueGroupedDataset or org.apache.spark.sql.RelationalGroupedDataset. What do you think ? Thank you, Narine
          Hide
          sunrui Sun Rui added a comment -

          Narine Kokhlikyan yes, https://issues.apache.org/jira/browse/SPARK-13897 changed it. I think we can add a method in KeyValueGroupedDataset as gapply() is a key-value style group by instead of relation style group by.

          Show
          sunrui Sun Rui added a comment - Narine Kokhlikyan yes, https://issues.apache.org/jira/browse/SPARK-13897 changed it. I think we can add a method in KeyValueGroupedDataset as gapply() is a key-value style group by instead of relation style group by.
          Hide
          Narine Narine Kokhlikyan added a comment -

          Thanks for the quick response, Sun Rui.

          I was playing with KeyValueGroupedDataset and have noticed that it works only for Datasets. When I try groupByKey for a DataFrame, it fails.
          This succeeds:
          val grouped = ds.groupByKey(v => (v._1, "word"))

          But the following fails:
          val grouped = df.groupByKey(v => (v._1, "word"))

          As far as I know in SparkR we are working with DataFrames, so this means that I need to convert the DataFrame to Dataset and work on Datasets on scala side ?!

          Thanks,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - Thanks for the quick response, Sun Rui . I was playing with KeyValueGroupedDataset and have noticed that it works only for Datasets. When I try groupByKey for a DataFrame, it fails. This succeeds: val grouped = ds.groupByKey(v => (v._1, "word")) But the following fails: val grouped = df.groupByKey(v => (v._1, "word")) As far as I know in SparkR we are working with DataFrames, so this means that I need to convert the DataFrame to Dataset and work on Datasets on scala side ?! Thanks, Narine
          Hide
          sunrui Sun Rui added a comment -

          Narine Kokhlikyan DataFrame and Dataset are now converged. DataFrame is a different view of Dataset, that is Dataset<Row>. So groupByKey is the same method for both Dataset and DataFrame, but the `func` is different as the data element view is different, for example:

          val ds = Seq((1,2), (3,4)).toDS
          val gd = ds.groupByKey(v=>v._1)
          val df = ds.toDF
          val gd1 = df.groupByKey(r=>r.getInt(0))
          
          Show
          sunrui Sun Rui added a comment - Narine Kokhlikyan DataFrame and Dataset are now converged. DataFrame is a different view of Dataset, that is Dataset<Row>. So groupByKey is the same method for both Dataset and DataFrame, but the `func` is different as the data element view is different, for example: val ds = Seq((1,2), (3,4)).toDS val gd = ds.groupByKey(v=>v._1) val df = ds.toDF val gd1 = df.groupByKey(r=>r.getInt(0))
          Hide
          Narine Narine Kokhlikyan added a comment -

          Sun Rui, Thank you very much for the explanation!
          Now I got it!

          Show
          Narine Narine Kokhlikyan added a comment - Sun Rui , Thank you very much for the explanation! Now I got it!
          Hide
          Narine Narine Kokhlikyan added a comment -

          Hi Sun Rui,

          I’ve made some progress in putting logical and physical plans together and calling R workers, however I still have some questions.
          1. I’m still not quite sure about the number of partitions. As you wrote in https://issues.apache.org/jira/browse/SPARK-6817 we need to
          tune the number of partitions based on “spark.sql.shuffle.partitions”. What do you exactly mean by tuning? Repartitioning ?
          2. I have another question about grouping by keys:
          groupByKey with one key is fine, however if we have more than one key we probably need to introduce a case class. With a case
          class it looks okay too, but I’m not sure how convenient it is. Any ideas ?
          case class KeyData(a: Int, b: Int)
          val gd1 = df.groupByKey(r=>KeyData(r.getInt(0), r.getInt(1)))

          Thanks,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - Hi Sun Rui , I’ve made some progress in putting logical and physical plans together and calling R workers, however I still have some questions. 1. I’m still not quite sure about the number of partitions. As you wrote in https://issues.apache.org/jira/browse/SPARK-6817 we need to tune the number of partitions based on “spark.sql.shuffle.partitions”. What do you exactly mean by tuning? Repartitioning ? 2. I have another question about grouping by keys: groupByKey with one key is fine, however if we have more than one key we probably need to introduce a case class. With a case class it looks okay too, but I’m not sure how convenient it is. Any ideas ? case class KeyData(a: Int, b: Int) val gd1 = df.groupByKey(r=>KeyData(r.getInt(0), r.getInt(1))) Thanks, Narine
          Hide
          sunrui Sun Rui added a comment -

          Narine Kokhlikyan,
          1. Typically users don't care number of partitions in SparkSQL. If they care, they can tune it by setting “spark.sql.shuffle.partitions”. It seems not related to implementation of gapply?
          2. I think we need support groupBy instead of groupByKey for DataFrame. for groupBy, users can specify multiple key columns at once. So a list should be used to hold the key columns.

          FYI, I have basically implemented dapply(), and is debugging it

          Show
          sunrui Sun Rui added a comment - Narine Kokhlikyan , 1. Typically users don't care number of partitions in SparkSQL. If they care, they can tune it by setting “spark.sql.shuffle.partitions”. It seems not related to implementation of gapply? 2. I think we need support groupBy instead of groupByKey for DataFrame. for groupBy, users can specify multiple key columns at once. So a list should be used to hold the key columns. FYI, I have basically implemented dapply(), and is debugging it
          Hide
          Narine Narine Kokhlikyan added a comment -

          Good job on dapply, Sun Rui !
          I'll do a pull request on this soon!

          Show
          Narine Narine Kokhlikyan added a comment - Good job on dapply, Sun Rui ! I'll do a pull request on this soon!
          Hide
          sunrui Sun Rui added a comment -

          cool. If possible, could you make it a WIP PR so that I can take a look earlier

          Show
          sunrui Sun Rui added a comment - cool. If possible, could you make it a WIP PR so that I can take a look earlier
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Narine Kokhlikyan Any update on this ? Would be great to have this in Spark 2.0

          Show
          shivaram Shivaram Venkataraman added a comment - Narine Kokhlikyan Any update on this ? Would be great to have this in Spark 2.0
          Hide
          Narine Narine Kokhlikyan added a comment -

          Hi Shivaram Venkataraman,

          Thanks for asking! I'm trying my best to finish this as soon as possible.

          There is an issue when it later calls mapPartitions in doExecute method - It seems that for gapply we need to append the grouping columns at the end of each row, similar to https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1260.

          I've tried also to implement my own Column appender, I'm not sure if it is the right way to go. Do you have any ideas, Sun Rui ?

          Thank you,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - Hi Shivaram Venkataraman , Thanks for asking! I'm trying my best to finish this as soon as possible. There is an issue when it later calls mapPartitions in doExecute method - It seems that for gapply we need to append the grouping columns at the end of each row, similar to https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1260 . I've tried also to implement my own Column appender, I'm not sure if it is the right way to go. Do you have any ideas, Sun Rui ? Thank you, Narine
          Hide
          shivaram Shivaram Venkataraman added a comment -

          Could you post a WIP pull request using your own column appender ? I am not too familiar with the Spark SQL internals but I think Reynold Xin or Davies Liu will be able to provide feedback if we have a PR up.

          Show
          shivaram Shivaram Venkataraman added a comment - Could you post a WIP pull request using your own column appender ? I am not too familiar with the Spark SQL internals but I think Reynold Xin or Davies Liu will be able to provide feedback if we have a PR up.
          Show
          sunrui Sun Rui added a comment - Narine Kokhlikyan does AppendColumns logical operator ( https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala#L150 ) help?
          Hide
          Narine Narine Kokhlikyan added a comment -

          Thank you for quick responses Shivaram Venkataraman and Sun Rui !
          Sun Rui, I could have used it but my concern is the Encoder of the keys. I have one implementation where I represent the keys as a row and I'm trying to use RowEncoder. Smth like:

          val gfunc = (r: Row) => convertKeysToRow(r, colNames)

          val withGroupingKey = AppendColumns(gfunc, inputPlan)

          But this doesn't really work...
          I'll push all my changes today and at least post the link to my changeset.

          Thank you !

          Show
          Narine Narine Kokhlikyan added a comment - Thank you for quick responses Shivaram Venkataraman and Sun Rui ! Sun Rui , I could have used it but my concern is the Encoder of the keys. I have one implementation where I represent the keys as a row and I'm trying to use RowEncoder. Smth like: val gfunc = (r: Row) => convertKeysToRow(r, colNames) val withGroupingKey = AppendColumns(gfunc, inputPlan) But this doesn't really work... I'll push all my changes today and at least post the link to my changeset. Thank you !
          Hide
          Narine Narine Kokhlikyan added a comment -

          Hi Sun Rui,

          I've pushed my changes. Here is the link:
          https://github.com/apache/spark/compare/master...NarineK:gapply

          There are some things which I can reuse from dapply, I've copied those in but will remove after merging with dapply.

          I think we can use AppendColumnsWithObject but it fails at line: 76, sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
          Not quite sure, why.
          assert(child.output.length == 1)

          Could you please verify the part with serializing and deserializing the rows ?

          Thank you,
          Narine

          Show
          Narine Narine Kokhlikyan added a comment - Hi Sun Rui , I've pushed my changes. Here is the link: https://github.com/apache/spark/compare/master...NarineK:gapply There are some things which I can reuse from dapply, I've copied those in but will remove after merging with dapply. I think we can use AppendColumnsWithObject but it fails at line: 76, sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala Not quite sure, why. assert(child.output.length == 1) Could you please verify the part with serializing and deserializing the rows ? Thank you, Narine
          Hide
          Narine Narine Kokhlikyan added a comment - - edited

          I think that it is better to use TypedColumns.

          Smth similar to: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L264
          I don't think that there is a support for Typed columns in SparkR, is there ?

          In that case we could create an encoder similar to:
          ExpressionEncoder.tuple(ExpressionEncoder[String], ExpressionEncoder[Int], ExpressionEncoder[Double])

          Is there a way to access the mapping between spark and scala type ?
          Like:
          IntegerType(spark) -> Int(scala)

          Thank you!

          Show
          Narine Narine Kokhlikyan added a comment - - edited I think that it is better to use TypedColumns. Smth similar to: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L264 I don't think that there is a support for Typed columns in SparkR, is there ? In that case we could create an encoder similar to: ExpressionEncoder.tuple(ExpressionEncoder [String] , ExpressionEncoder [Int] , ExpressionEncoder [Double] ) Is there a way to access the mapping between spark and scala type ? Like: IntegerType(spark) -> Int(scala) Thank you!
          Hide
          apachespark Apache Spark added a comment -

          User 'NarineK' has created a pull request for this issue:
          https://github.com/apache/spark/pull/12836

          Show
          apachespark Apache Spark added a comment - User 'NarineK' has created a pull request for this issue: https://github.com/apache/spark/pull/12836
          Show
          shivaram Shivaram Venkataraman added a comment - Resolved by https://github.com/apache/spark/pull/12836
          Hide
          Narine Narine Kokhlikyan added a comment - - edited
          Show
          Narine Narine Kokhlikyan added a comment - - edited FYI, Oscar D. Lara Yejas , Alok Singh , Vijay Bommireddipalli
          Hide
          timhunter Timothy Hunter added a comment -

          Narine Kokhlikyan while working on a similar function for python [1], we found it easier to have the following changes:

          • the keys are appended by default to the spark dataframe being returned
          • the output schema that the users provides is the schema of the R data frame and does not include the keys

          Here were our reasons to depart from the R implementation of gapply:

          • in most cases, users will want to know the key associated with a result -> appending the key is the sensible default
          • most functions in the SQL interface and in MLlib append columns, and gapply departs from this philosophy
          • for the cases when they do not need it, adding the key is a fraction of the computation time and of the output size
          • from a formal perspective, it makes calling gapply fully transparent to the type of the key: it is easier to build a function with gapply because it does not need to know anything about the key

          I think it would make sense to make this change to the R's gapply implementation. Let me know what you think about it.

          [1] https://github.com/databricks/spark-sklearn/blob/master/python/spark_sklearn/group_apply.py

          Show
          timhunter Timothy Hunter added a comment - Narine Kokhlikyan while working on a similar function for python [1] , we found it easier to have the following changes: the keys are appended by default to the spark dataframe being returned the output schema that the users provides is the schema of the R data frame and does not include the keys Here were our reasons to depart from the R implementation of gapply: in most cases, users will want to know the key associated with a result -> appending the key is the sensible default most functions in the SQL interface and in MLlib append columns, and gapply departs from this philosophy for the cases when they do not need it, adding the key is a fraction of the computation time and of the output size from a formal perspective, it makes calling gapply fully transparent to the type of the key: it is easier to build a function with gapply because it does not need to know anything about the key I think it would make sense to make this change to the R's gapply implementation. Let me know what you think about it. [1] https://github.com/databricks/spark-sklearn/blob/master/python/spark_sklearn/group_apply.py
          Hide
          Narine Narine Kokhlikyan added a comment - - edited

          Thank you Timothy Hunter for sharing this information with us.
          It is a nice idea. I think that it could be seen as an extension of current gapply's implementation.

          I think that, in general, whether the keys are useful or not depends on the use case. Most probably, the user, naturally, would like to see the matching key of each group-output and it would make sense to attach/append the keys by default.
          If the user doesn't need the keys he or she can easily detach/drop those columns.

          Show
          Narine Narine Kokhlikyan added a comment - - edited Thank you Timothy Hunter for sharing this information with us. It is a nice idea. I think that it could be seen as an extension of current gapply's implementation. I think that, in general, whether the keys are useful or not depends on the use case. Most probably, the user, naturally, would like to see the matching key of each group-output and it would make sense to attach/append the keys by default. If the user doesn't need the keys he or she can easily detach/drop those columns.
          Hide
          timhunter Timothy Hunter added a comment -

          I opened a separate JIRA for that issue: SPARK-16258

          Show
          timhunter Timothy Hunter added a comment - I opened a separate JIRA for that issue: SPARK-16258

            People

            • Assignee:
              Narine Narine Kokhlikyan
              Reporter:
              sunrui Sun Rui
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development