Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-9999

Dataset API on top of Catalyst/DataFrame

    Details

    • Type: Story
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.0.0
    • Component/s: SQL
    • Labels:
      None
    • Target Version/s:

      Description

      The RDD API is very flexible, and as a result harder to optimize its execution in some cases. The DataFrame API, on the other hand, is much easier to optimize, but lacks some of the nice perks of the RDD API (e.g. harder to use UDFs, lack of strong types in Scala/Java).

      The goal of Spark Datasets is to provide an API that allows users to easily express transformations on domain objects, while also providing the performance and robustness advantages of the Spark SQL execution engine.

      Requirements

      • Fast - In most cases, the performance of Datasets should be equal to or better than working with RDDs. Encoders should be as fast or faster than Kryo and Java serialization, and unnecessary conversion should be avoided.
      • Typesafe - Similar to RDDs, objects and functions that operate on those objects should provide compile-time safety where possible. When converting from data where the schema is not known at compile-time (for example data read from an external source such as JSON), the conversion function should fail-fast if there is a schema mismatch.
      • Support for a variety of object models - Default encoders should be provided for a variety of object models: primitive types, case classes, tuples, POJOs, JavaBeans, etc. Ideally, objects that follow standard conventions, such as Avro SpecificRecords, should also work out of the box.
      • Java Compatible - Datasets should provide a single API that works in both Scala and Java. Where possible, shared types like Array will be used in the API. Where not possible, overloaded functions should be provided for both languages. Scala concepts, such as ClassTags should not be required in the user-facing API.
      • Interoperates with DataFrames - Users should be able to seamlessly transition between Datasets and DataFrames, without specifying conversion boiler-plate. When names used in the input schema line-up with fields in the given class, no extra mapping should be necessary. Libraries like MLlib should not need to provide different interfaces for accepting DataFrames and Datasets as input.

      For a detailed outline of the complete proposed API: marmbrus/dataset-api
      For an initial discussion of the design considerations in this API: design doc

      The initial version of the Dataset API has been merged in Spark 1.6. However, it will take a few more future releases to flush everything out.

        Issue Links

        1.
        Inital code generated encoder for product types Sub-task Resolved Michael Armbrust
         
        2.
        Initial code generated construction of Product classes from InternalRow Sub-task Resolved Michael Armbrust
         
        3.
        Initial API Draft Sub-task Resolved Michael Armbrust
         
        4.
        add encoder/decoder for external row Sub-task Resolved Wenchen Fan
         
        5.
        Java API support & test cases Sub-task Resolved Wenchen Fan
         
        6.
        Implement cogroup Sub-task Resolved Wenchen Fan
         
        7.
        Support for joining two datasets, returning a tuple of objects Sub-task Resolved Michael Armbrust
         
        8.
        GroupedIterator's hasNext is not idempotent Sub-task Resolved Unassigned
         
        9.
        groupBy on column expressions Sub-task Resolved Michael Armbrust
         
        10.
        Typed-safe aggregations Sub-task Resolved Michael Armbrust
         
        11.
        add map/flatMap to GroupedDataset Sub-task Resolved Wenchen Fan
         
        12.
        User facing api for typed aggregation Sub-task Resolved Michael Armbrust
         
        13.
        Improve toString Function Sub-task Resolved Michael Armbrust
         
        14.
        add java test for typed aggregate Sub-task Resolved Wenchen Fan
         
        15.
        Support as on Classes defined in the REPL Sub-task Resolved Michael Armbrust
         
        16.
        add reduce to GroupedDataset Sub-task Resolved Wenchen Fan
         
        17.
        support typed aggregate in project list Sub-task Resolved Wenchen Fan
         
        18.
        split ExpressionEncoder into FlatEncoder and ProductEncoder Sub-task Resolved Wenchen Fan
         
        19.
        org.apache.spark.sql.AnalysisException: Can't extract value from a#12 Sub-task Resolved Wenchen Fan
         
        20.
        collect, first, and take should use encoders for serialization Sub-task Resolved Reynold Xin
         
        21.
        Kryo-based encoder for opaque types Sub-task Resolved Reynold Xin
         
        22.
        Dataset self join returns incorrect result Sub-task Resolved Wenchen Fan
         
        23.
        Java-based encoder for opaque types Sub-task Resolved Reynold Xin
         
        24.
        nice error message for missing encoder Sub-task Resolved Wenchen Fan
         
        25.
        Add Java tests for Kryo/Java encoders Sub-task Resolved Reynold Xin
         
        26.
        add type cast if the real type is different but compatible with encoder schema Sub-task Resolved Wenchen Fan
         
        27.
        Incorrect results are returned when using null Sub-task Resolved Wenchen Fan
         
        28.
        support typed aggregate for complex buffer schema Sub-task Resolved Wenchen Fan
         
        29.
        fix `nullable` of encoder schema Sub-task Resolved Wenchen Fan
         
        30.
        fix encoder life cycle for CoGroup Sub-task Resolved Wenchen Fan
         
        31.
        Encoder for JavaBeans / POJOs Sub-task Resolved Wenchen Fan
         
        32.
        [SQL] Adding joinType into joinWith Sub-task Closed Unassigned
         
        33.
        Add missing APIs in Dataset Sub-task Resolved Xiao Li
         
        34.
        [SQL] Support Persist/Cache and Unpersist in Dataset APIs Sub-task Resolved Xiao Li
         
        35.
        refactor MapObjects to make it less hacky Sub-task Resolved Wenchen Fan
         
        36.
        WrapOption should not have type constraint for child Sub-task Resolved Apache Spark
         
        37.
        throw exception if the number of fields does not line up for Tuple encoder Sub-task Resolved Wenchen Fan
         
        38.
        use true as default value for propagateNull in NewInstance Sub-task Resolved Apache Spark
         
        39.
        Add BINARY to Encoders Sub-task Resolved Apache Spark
         
        40.
        Eliminate serialization for back to back operations Sub-task Resolved Michael Armbrust
         
        41.
        Move encoder definition into Aggregator interface Sub-task Resolved Reynold Xin
         
        42.
        Explicit APIs in Scala for specifying encoders Sub-task Resolved Reynold Xin
         

          Activity

          Hide
          hvanhovell Herman van Hovell added a comment -

          This sounds interesting.

          In order to get this working, we need to get more information on the (black-box) operators used. So some analysis capability, or some predefined building blocks (SQL-lite if you will) are probably needed. Apache Flink uses static code analysis and annotations for to achieve a similar goal:
          http://flink.apache.org/news/2015/06/24/announcing-apache-flink-0.9.0-release.html
          https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html#semantic-annotations

          Any other ideas?

          Show
          hvanhovell Herman van Hovell added a comment - This sounds interesting. In order to get this working, we need to get more information on the (black-box) operators used. So some analysis capability, or some predefined building blocks (SQL-lite if you will) are probably needed. Apache Flink uses static code analysis and annotations for to achieve a similar goal: http://flink.apache.org/news/2015/06/24/announcing-apache-flink-0.9.0-release.html https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html#semantic-annotations Any other ideas?
          Hide
          rxin Reynold Xin added a comment -

          This needs to be designed first. I'm not sure if static code analysis is a great idea since they fail often. I'm open to ideas though.

          Show
          rxin Reynold Xin added a comment - This needs to be designed first. I'm not sure if static code analysis is a great idea since they fail often. I'm open to ideas though.
          Hide
          saurfang Sen Fang added a comment -

          Another idea is do something similar to F# TypeProvider approach: http://fsharp.github.io/FSharp.Data/
          I haven't looked into this extensively just yet but as far as I understand this uses compile time macro to generate classes based on data sources. In that sense, it is slightly similar to protobuf where you generate Java class based on schema definition. This makes dataframe type safe at the very upstream. With a bit of IDE plugin, you will even able to have autocomplete and type check when you write code, which would be very nice. I'm not sure if it will be scalable to propagate these type information down stream (in aggregation or transformed dataframe) though. As I understand, the macro and type provider in Scala provides similar capabilities.

          Show
          saurfang Sen Fang added a comment - Another idea is do something similar to F# TypeProvider approach: http://fsharp.github.io/FSharp.Data/ I haven't looked into this extensively just yet but as far as I understand this uses compile time macro to generate classes based on data sources. In that sense, it is slightly similar to protobuf where you generate Java class based on schema definition. This makes dataframe type safe at the very upstream. With a bit of IDE plugin, you will even able to have autocomplete and type check when you write code, which would be very nice. I'm not sure if it will be scalable to propagate these type information down stream (in aggregation or transformed dataframe) though. As I understand, the macro and type provider in Scala provides similar capabilities.
          Hide
          sandyr Sandy Ryza added a comment -

          To ask the obvious question: what are the reasons that the RDD API couldn't be adapted to these purposes? If I understand correctly, a summarization of the differences is that Datasets:
          1. Support encoders for conversion to schema'd / efficiently serializable data
          2. Have a GroupedDataset concept
          3. Execute on Catalyst instead of directly on top of the DAGScheduler

          How difficult would it be to add encoders on top of RDDs, as well as a GroupedRDD? Is there anything in the RDD API contract that says RDDs can't be executed on top of Catalyst?

          Surely this creates some dependency hell as well given that SQL depends on core, but surely that's better than exposing an entirely new API that looks almost like the original one.

          Show
          sandyr Sandy Ryza added a comment - To ask the obvious question: what are the reasons that the RDD API couldn't be adapted to these purposes? If I understand correctly, a summarization of the differences is that Datasets: 1. Support encoders for conversion to schema'd / efficiently serializable data 2. Have a GroupedDataset concept 3. Execute on Catalyst instead of directly on top of the DAGScheduler How difficult would it be to add encoders on top of RDDs, as well as a GroupedRDD? Is there anything in the RDD API contract that says RDDs can't be executed on top of Catalyst? Surely this creates some dependency hell as well given that SQL depends on core, but surely that's better than exposing an entirely new API that looks almost like the original one.
          Hide
          srowen Sean Owen added a comment -

          I had a similar question about how much more this is than the current RDD API. For example, is the idea that, with the help of caller-provided annotations and/or some code analysis perhaps you could deduce more about operations and optimize them more? A lot of the API already covers the basics, like assuming reduce functions are associative, etc.

          I get transformations on domain objects in the style of Spark SQL but I can already "groupBy(customer.name)" in a normal RDD.
          I can also go sorta easily from DataFrames to RDDs and back.

          So I assume it's about static analysis of user functions, in the main?
          Or about getting to/from a Row faster?

          Show
          srowen Sean Owen added a comment - I had a similar question about how much more this is than the current RDD API. For example, is the idea that, with the help of caller-provided annotations and/or some code analysis perhaps you could deduce more about operations and optimize them more? A lot of the API already covers the basics, like assuming reduce functions are associative, etc. I get transformations on domain objects in the style of Spark SQL but I can already "groupBy(customer.name)" in a normal RDD. I can also go sorta easily from DataFrames to RDDs and back. So I assume it's about static analysis of user functions, in the main? Or about getting to/from a Row faster?
          Hide
          rxin Reynold Xin added a comment -

          Sandy Ryza I thought a lot about doing this on top of the existing RDD API for a while, and that was my preference. However, we would need to break the RDD API, which breaks all existing applications.

          Show
          rxin Reynold Xin added a comment - Sandy Ryza I thought a lot about doing this on top of the existing RDD API for a while, and that was my preference. However, we would need to break the RDD API, which breaks all existing applications.
          Hide
          sandyr Sandy Ryza added a comment -

          Reynold Xin where are the places where the API would need to break?

          Show
          sandyr Sandy Ryza added a comment - Reynold Xin where are the places where the API would need to break?
          Hide
          rxin Reynold Xin added a comment -

          The big ones are:
          1. encoders (which breaks almost every function that has a type parameter that's not T)
          2. "partitions" (partitioning is a physical concept, and shouldn't be required as part of API semantics)
          3. groupBy ...
          ...

          Show
          rxin Reynold Xin added a comment - The big ones are: 1. encoders (which breaks almost every function that has a type parameter that's not T) 2. "partitions" (partitioning is a physical concept, and shouldn't be required as part of API semantics) 3. groupBy ... ...
          Hide
          marmbrus Michael Armbrust added a comment -

          Other compatibility breaking things include: getting rid of class tags from the public API (a common complaint from java users) and not using a separate class for Java users (JavaRDD).

          Show
          marmbrus Michael Armbrust added a comment - Other compatibility breaking things include: getting rid of class tags from the public API (a common complaint from java users) and not using a separate class for Java users (JavaRDD).
          Hide
          sandyr Sandy Ryza added a comment -

          If I understand correctly, it seems like there are ways to work around each of these issues that, necessarily, make the API dirtier, but avoid the need for a whole new public API.

          • groupBy: deprecate the old groupBy and add a groupWith or groupby method that returns a GroupedRDD.
          • partitions: have -1 be a special value that means "determined by the planner"
          • encoders: what are the main obstacles to addressing this with an EncodedRDD that extends RDD?

          Regarding the issues Michael brought up:
          I'd love to get rid of class tags from the public API as well as take out JavaRDD, but these seem more like "nice to have" than core to the proposal. Am I misunderstanding?

          All of these of course add ugliness, but I think it's really easy to underestimate the cost of introducing a new API. Applications everywhere become legacy and need to be rewritten to take advantage of new features. Code examples and training materials everywhere become invalidated. Can we point to systems that have successfully made a transition like this at this point in their maturity?

          Show
          sandyr Sandy Ryza added a comment - If I understand correctly, it seems like there are ways to work around each of these issues that, necessarily, make the API dirtier, but avoid the need for a whole new public API. groupBy: deprecate the old groupBy and add a groupWith or groupby method that returns a GroupedRDD. partitions: have -1 be a special value that means "determined by the planner" encoders: what are the main obstacles to addressing this with an EncodedRDD that extends RDD? Regarding the issues Michael brought up: I'd love to get rid of class tags from the public API as well as take out JavaRDD, but these seem more like "nice to have" than core to the proposal. Am I misunderstanding? All of these of course add ugliness, but I think it's really easy to underestimate the cost of introducing a new API. Applications everywhere become legacy and need to be rewritten to take advantage of new features. Code examples and training materials everywhere become invalidated. Can we point to systems that have successfully made a transition like this at this point in their maturity?
          Hide
          rxin Reynold Xin added a comment - - edited

          Sandy Ryza Your concern is absolutely valid, but I don't think your EncodedRDD proposal works. For one, the map function (every other function that returns a type different from RDD's own T) will break. For two, the whole concept of PairRDDFunctions should go away with this new API.

          As I said, it's actually my preference to just use the RDD API. But if you take a look at what's needed here, it'd break too many functions. So we have the following choices:

          1. Don't create a new API, and break the RDD API. People then can't update to newer versions of Spark unless they rewrite their apps. We did this with the SchemaRDD -> DataFrame change, which went well – but SchemaRDD wasn't really an advertised API back then.

          2. Create a new API, and keep RDD API intact. People can update to new versions of Spark, but they can't take full advantage of all the Tungsten/DataFrame work immediately unless they rewrite their apps. Maybe we can implement the RDD API later in some cases using the new API so legacy apps can still take advantage whenever possible (e.g. inferring encoder based on classtags when possible).

          Also the RDD API as I see it today is actually a pretty good way for developers to provide data (i.e. used for data sources). If we break it, we'd still need to come up with a new data input API.

          Show
          rxin Reynold Xin added a comment - - edited Sandy Ryza Your concern is absolutely valid, but I don't think your EncodedRDD proposal works. For one, the map function (every other function that returns a type different from RDD's own T) will break. For two, the whole concept of PairRDDFunctions should go away with this new API. As I said, it's actually my preference to just use the RDD API. But if you take a look at what's needed here, it'd break too many functions. So we have the following choices: 1. Don't create a new API, and break the RDD API. People then can't update to newer versions of Spark unless they rewrite their apps. We did this with the SchemaRDD -> DataFrame change, which went well – but SchemaRDD wasn't really an advertised API back then. 2. Create a new API, and keep RDD API intact. People can update to new versions of Spark, but they can't take full advantage of all the Tungsten/DataFrame work immediately unless they rewrite their apps. Maybe we can implement the RDD API later in some cases using the new API so legacy apps can still take advantage whenever possible (e.g. inferring encoder based on classtags when possible). Also the RDD API as I see it today is actually a pretty good way for developers to provide data (i.e. used for data sources). If we break it, we'd still need to come up with a new data input API.
          Hide
          rxin Reynold Xin added a comment -

          BTW another possible approach that we haven't discussed is that we can start with an experimental new API, and in Spark 2.0 rename it to RDD. I'm less in favor of this because it still means applications can't update to Spark 2.0 without rewriting.

          Show
          rxin Reynold Xin added a comment - BTW another possible approach that we haven't discussed is that we can start with an experimental new API, and in Spark 2.0 rename it to RDD. I'm less in favor of this because it still means applications can't update to Spark 2.0 without rewriting.
          Hide
          marmbrus Michael Armbrust added a comment -

          I think improving Java compatibility and getting rid of the ClassTags is more than a nice to have. Having a separate class hierarchy for Java/Scala makes it very hard for people to build higher level libraries that work with both Scala and Java. As a result, I think Java adoption suffers. ClassTags are burdensome for both Scala and Java users.

          In order to make encoders work they way we want, nearly every function that takes a ClassTag today will need to be changed to take an encoder. As Reynold Xin points out, I think that kind of compatibly breaking is actually more damaging for a project of Spark's maturity than providing a higher-level parallel API to RDDs.

          That said, I think source compatibility for common code between RDDs -> Datasets would be great to make sure users can make the transition with as little pain as possible.

          Show
          marmbrus Michael Armbrust added a comment - I think improving Java compatibility and getting rid of the ClassTags is more than a nice to have . Having a separate class hierarchy for Java/Scala makes it very hard for people to build higher level libraries that work with both Scala and Java. As a result, I think Java adoption suffers. ClassTags are burdensome for both Scala and Java users. In order to make encoders work they way we want, nearly every function that takes a ClassTag today will need to be changed to take an encoder. As Reynold Xin points out, I think that kind of compatibly breaking is actually more damaging for a project of Spark's maturity than providing a higher-level parallel API to RDDs. That said, I think source compatibility for common code between RDDs -> Datasets would be great to make sure users can make the transition with as little pain as possible.
          Hide
          sandyr Sandy Ryza added a comment -

          Thanks for the explanation Reynold Xin and Michael Armbrust. I understand the problem and don't have any great ideas for an alternative workable solution.

          Show
          sandyr Sandy Ryza added a comment - Thanks for the explanation Reynold Xin and Michael Armbrust . I understand the problem and don't have any great ideas for an alternative workable solution.
          Hide
          sandyr Sandy Ryza added a comment -

          Maybe you all have thought through this as well, but I had some more thoughts on the proposed API.

          Fundamentally, it seems a weird to me that the user is responsible for having a matching Encoder around every time they want to map to a class of a particular type. In 99% of cases, the Encoder used to encode any given type will be the same, and it seems more intuitive to me to specify this up front.

          To be more concrete, suppose I want to use case classes in my app and have a function that can auto-generate an Encoder from a class object (though this might be a little bit time consuming because it needs to use reflection). With the current proposal, any time I want to map my Dataset to a Dataset of some case class, I need to either have a line of code that generates an Encoder for that case class, or have an Encoder already lying around. If I perform this operation within a method, I need to pass the Encoder down to the method and include it in the signature.

          Ideally I would be able to register an EncoderSystem up front that caches Encoders and generates new Encoders whenever it sees a new class used. This still of course requires the user to pass in type information when they call map, but it's easier for them to get this information than an actual encoder. If there's not some principled way to get this working implicitly with ClassTags, the user could just pass in classOf[MyCaseClass] as the second argument to map.

          Show
          sandyr Sandy Ryza added a comment - Maybe you all have thought through this as well, but I had some more thoughts on the proposed API. Fundamentally, it seems a weird to me that the user is responsible for having a matching Encoder around every time they want to map to a class of a particular type. In 99% of cases, the Encoder used to encode any given type will be the same, and it seems more intuitive to me to specify this up front. To be more concrete, suppose I want to use case classes in my app and have a function that can auto-generate an Encoder from a class object (though this might be a little bit time consuming because it needs to use reflection). With the current proposal, any time I want to map my Dataset to a Dataset of some case class, I need to either have a line of code that generates an Encoder for that case class, or have an Encoder already lying around. If I perform this operation within a method, I need to pass the Encoder down to the method and include it in the signature. Ideally I would be able to register an EncoderSystem up front that caches Encoders and generates new Encoders whenever it sees a new class used. This still of course requires the user to pass in type information when they call map, but it's easier for them to get this information than an actual encoder. If there's not some principled way to get this working implicitly with ClassTags, the user could just pass in classOf [MyCaseClass] as the second argument to map.
          Hide
          marmbrus Michael Armbrust added a comment -

          Sandy Ryza did you look at the test cases in scala and java linked from the attached design doc?

          In Scala, users should never have to think about Encoders as long as their data can be represented as primitives, case classes, tuples, or collections. Implicits (provided by sqlContext.implicits._) automatically pass the required information to the function.

          In Java, the compiler is not helping us out as much, so the user must do as you suggest. The prototype shows ProductEncoder.tuple(Long.class, Long.class), but we will have a similar interface that works for class objects for POJOs / JavaBeans. The problem with doing this using a registry (like kryo in RDDs today) is that then you aren't finding out the object type until you have an example object from realizing the computation. That is often too late to do the kinds of optimizations that we are trying to enable. Instead we'd like to statically realize the schema at Dataset construction time.

          Encoders are just an encapsulation of the required information and provide an interface if we ever want to allow someone to specify a custom encoder.

          Regarding the performance concerns with reflection, the implementation that is already present in Spark master (SPARK-10993 and SPARK-11090) is based on catalyst expressions. Reflection is done once on the driver, and the existing code generation caching framework is taking care of caching generated encoder bytecode on the executors.

          Show
          marmbrus Michael Armbrust added a comment - Sandy Ryza did you look at the test cases in scala and java linked from the attached design doc? In Scala, users should never have to think about Encoders as long as their data can be represented as primitives, case classes, tuples, or collections. Implicits (provided by sqlContext.implicits._ ) automatically pass the required information to the function. In Java, the compiler is not helping us out as much, so the user must do as you suggest. The prototype shows ProductEncoder.tuple(Long.class, Long.class) , but we will have a similar interface that works for class objects for POJOs / JavaBeans. The problem with doing this using a registry (like kryo in RDDs today) is that then you aren't finding out the object type until you have an example object from realizing the computation. That is often too late to do the kinds of optimizations that we are trying to enable. Instead we'd like to statically realize the schema at Dataset construction time. Encoders are just an encapsulation of the required information and provide an interface if we ever want to allow someone to specify a custom encoder. Regarding the performance concerns with reflection, the implementation that is already present in Spark master ( SPARK-10993 and SPARK-11090 ) is based on catalyst expressions. Reflection is done once on the driver, and the existing code generation caching framework is taking care of caching generated encoder bytecode on the executors.
          Hide
          sandyr Sandy Ryza added a comment -

          The problem with doing this using a registry (like kryo in RDDs today) is that then you aren't finding out the object type until you have an example object from realizing the computation.

          My suggestion was that the user would still need to pass the class object, so this shouldn't be a problem, unless I'm misunderstanding.

          Thanks to the pointer to the test suite. So am I to understand correctly that with Scala implicits magic I can do the following without any additional boilerplate?

          import <some basic sql stuff>
          
          case class MyClass1(<some fields>)
          case class MyClass2(<some fields>)
          
          val ds : Dataset[MyClass1] = ...
          val ds2: Dataset[MyClass2] = ds.map(funcThatConvertsFromMyClass1ToMyClass2)
          

          and in Java, imagining those case classes above were POJOs, we'd be able to support the following?

          Dataset<MyClass2> ds2 = ds1.map(funcThatConvertsFromMyClass1ToMyClass2, MyClass2.class);
          

          If that's the case, then that resolves my concerns above.

          Lastly, though, IIUC, it seems like for all the common cases, we could register an object with the SparkContext that converts from ClassTag to Encoder, and the RDD API would work. Where does that break down?

          Show
          sandyr Sandy Ryza added a comment - The problem with doing this using a registry (like kryo in RDDs today) is that then you aren't finding out the object type until you have an example object from realizing the computation. My suggestion was that the user would still need to pass the class object, so this shouldn't be a problem, unless I'm misunderstanding. Thanks to the pointer to the test suite. So am I to understand correctly that with Scala implicits magic I can do the following without any additional boilerplate? import <some basic sql stuff> case class MyClass1(<some fields>) case class MyClass2(<some fields>) val ds : Dataset[MyClass1] = ... val ds2: Dataset[MyClass2] = ds.map(funcThatConvertsFromMyClass1ToMyClass2) and in Java, imagining those case classes above were POJOs, we'd be able to support the following? Dataset<MyClass2> ds2 = ds1.map(funcThatConvertsFromMyClass1ToMyClass2, MyClass2.class); If that's the case, then that resolves my concerns above. Lastly, though, IIUC, it seems like for all the common cases, we could register an object with the SparkContext that converts from ClassTag to Encoder, and the RDD API would work. Where does that break down?
          Hide
          marmbrus Michael Armbrust added a comment -

          Yeah, that Scala code should work. Regarding the Java version, the only difference is the API I have in mind would be Encoder.for(MyClass2.class). Passing in an encoder instead of a raw Class[_] gives us some extra indirection in case we want to support custom encoders some day.

          I'll add that we can also play reflection tricks in cases where things are not erased for Java, and this is the part of the proposal that is the least thought out at the moment. Any help making this part as powerful/robust as possible would be greatly appreciated.

          I think that is possible that in the long term we will do as you propose and remake the RDD API as a compatibility layer with the option to infer the encoder based on the class tag. The problem with this being the primary implementation is erasure.

          scala> import scala.reflect._
          
          scala> classTag[(Int, Int)].erasure.getTypeParameters
          res0: Array[java.lang.reflect.TypeVariable[Class[_$1]]] forSome { type _$1 } = Array(T1, T2)
          

          We've lost the type of _1 and _2 and so we are going to have to fall back on runtime reflection again, per tuple. Where as the encoders that are checked into master could extract primitive int without any additional boxing and encode them directly into tungsten buffers.

          Show
          marmbrus Michael Armbrust added a comment - Yeah, that Scala code should work. Regarding the Java version, the only difference is the API I have in mind would be Encoder.for(MyClass2.class) . Passing in an encoder instead of a raw Class [_] gives us some extra indirection in case we want to support custom encoders some day. I'll add that we can also play reflection tricks in cases where things are not erased for Java, and this is the part of the proposal that is the least thought out at the moment. Any help making this part as powerful/robust as possible would be greatly appreciated. I think that is possible that in the long term we will do as you propose and remake the RDD API as a compatibility layer with the option to infer the encoder based on the class tag. The problem with this being the primary implementation is erasure. scala> import scala.reflect._ scala> classTag[(Int, Int)].erasure.getTypeParameters res0: Array[java.lang.reflect.TypeVariable[ Class [_$1]]] forSome { type _$1 } = Array(T1, T2) We've lost the type of _1 and _2 and so we are going to have to fall back on runtime reflection again, per tuple. Where as the encoders that are checked into master could extract primitive int without any additional boxing and encode them directly into tungsten buffers.
          Hide
          sandyr Sandy Ryza added a comment -

          So ClassTags would work for case classes and Avro specific records, but wouldn't work for tuples (or anywhere else types get erased). Blrgh. I wonder if the former is enough? Tuples are pretty useful though.

          Show
          sandyr Sandy Ryza added a comment - So ClassTags would work for case classes and Avro specific records, but wouldn't work for tuples (or anywhere else types get erased). Blrgh. I wonder if the former is enough? Tuples are pretty useful though.
          Hide
          marmbrus Michael Armbrust added a comment -

          Yeah, I think tuples are a pretty important use case. Perhaps more importantly though, I think having a concept of encoders instead of relying on JVM types future proofs the API by giving us more control. If you look closely at the test case examples, there are some pretty crazy macro examples (i.e., R(a = 1, b = 2L)) where we actually create something like named tuples that codegen at compile time the logic required to directly encode the users results into tungsten format without needing to allocate an intermediate object.

          Show
          marmbrus Michael Armbrust added a comment - Yeah, I think tuples are a pretty important use case. Perhaps more importantly though, I think having a concept of encoders instead of relying on JVM types future proofs the API by giving us more control. If you look closely at the test case examples, there are some pretty crazy macro examples (i.e., R(a = 1, b = 2L) ) where we actually create something like named tuples that codegen at compile time the logic required to directly encode the users results into tungsten format without needing to allocate an intermediate object.
          Hide
          matei Matei Zaharia added a comment -

          Beyond tuples, you'll also want encoders for other generic classes, such as Seq[T]. They're the cleanest mechanism to get the most type info. Also, from a software engineering point of view it's nice to avoid a central object where you register stuff to allow composition between libraries (basically, see the problems that the Kryo registry creates today).

          Show
          matei Matei Zaharia added a comment - Beyond tuples, you'll also want encoders for other generic classes, such as Seq [T] . They're the cleanest mechanism to get the most type info. Also, from a software engineering point of view it's nice to avoid a central object where you register stuff to allow composition between libraries (basically, see the problems that the Kryo registry creates today).
          Hide
          nchammas Nicholas Chammas added a comment -

          Arriving a little late to this discussion. Quick question for Reynold/Michael:

          Will Python (and R) get this API in time for 1.6, or is that planned for a later release? Once the Scala API is ready, I'm guessing that the Python version will mostly be a lightweight wrapper around that API.

          Show
          nchammas Nicholas Chammas added a comment - Arriving a little late to this discussion. Quick question for Reynold/Michael: Will Python (and R) get this API in time for 1.6, or is that planned for a later release? Once the Scala API is ready, I'm guessing that the Python version will mostly be a lightweight wrapper around that API.
          Hide
          sandyr Sandy Ryza added a comment -

          Nicholas Chammas it's not clear that it makes sense to add a similar API for Python and R. The main point of the Dataset API, as I understand it, is to extend DataFrames to take advantage of Java / Scala's static typing systems. This means recovering compile-time type safety, integration with existing Java / Scale object frameworks, and Scala syntactic sugar like pattern matching. Python and R are dynamically typed so can't take advantage of these.

          Show
          sandyr Sandy Ryza added a comment - Nicholas Chammas it's not clear that it makes sense to add a similar API for Python and R. The main point of the Dataset API, as I understand it, is to extend DataFrames to take advantage of Java / Scala's static typing systems. This means recovering compile-time type safety, integration with existing Java / Scale object frameworks, and Scala syntactic sugar like pattern matching. Python and R are dynamically typed so can't take advantage of these.
          Hide
          nchammas Nicholas Chammas added a comment - - edited

          Sandy Ryza - Hmm, so are you saying that, generally speaking, Datasets will provide no performance advantages over DataFrames, and that they will just help in terms of catching type errors early?

          Python and R are dynamically typed so can't take advantage of these.

          I can't speak for R, but Python has supported type hints since 3.0. More recently, Python 3.5 introduced a typing module to standardize how type hints are specified, which facilitates the use of static type checkers like mypy. PySpark could definitely offer a statically type checked API, but practically speaking it would have to be limited to Python 3+.

          I suppose people don't generally expect static type checking when they use Python, so perhaps it makes sense not to support Datasets in PySpark.

          Show
          nchammas Nicholas Chammas added a comment - - edited Sandy Ryza - Hmm, so are you saying that, generally speaking, Datasets will provide no performance advantages over DataFrames, and that they will just help in terms of catching type errors early? Python and R are dynamically typed so can't take advantage of these. I can't speak for R, but Python has supported type hints since 3.0. More recently, Python 3.5 introduced a typing module to standardize how type hints are specified, which facilitates the use of static type checkers like mypy . PySpark could definitely offer a statically type checked API, but practically speaking it would have to be limited to Python 3+. I suppose people don't generally expect static type checking when they use Python, so perhaps it makes sense not to support Datasets in PySpark.
          Hide
          maver1ck Maciej Bryński added a comment - - edited

          I think we could check types also in Python.
          As I understand DataSet should have performance advantage over RDD. Am I right ?

          Show
          maver1ck Maciej Bryński added a comment - - edited I think we could check types also in Python. As I understand DataSet should have performance advantage over RDD. Am I right ?
          Hide
          smilegator Xiao Li added a comment -

          Agree. The major performance gain of Dataset should be from Catalyst Optimizer.

          Show
          smilegator Xiao Li added a comment - Agree. The major performance gain of Dataset should be from Catalyst Optimizer.
          Hide
          nchammas Nicholas Chammas added a comment -

          If you are referring to my comment, note that I am asking about Dataset vs. DataFrame, not Dataset vs. RDD.

          Show
          nchammas Nicholas Chammas added a comment - If you are referring to my comment, note that I am asking about Dataset vs. DataFrame, not Dataset vs. RDD.
          Hide
          rxin Reynold Xin added a comment -

          Nicholas Chammas Dataset actually will be slightly slower than DataFrame due to the conversion necessary from/to user-defined types. We do codegen all the conversions, but they are still conversions.

          Show
          rxin Reynold Xin added a comment - Nicholas Chammas Dataset actually will be slightly slower than DataFrame due to the conversion necessary from/to user-defined types. We do codegen all the conversions, but they are still conversions.
          Hide
          smilegator Xiao Li added a comment -

          Will you publish the exact performance penalty? Is it obvious? For example, larger than 1% in general workloads? I know it depends on the workloads.

          Show
          smilegator Xiao Li added a comment - Will you publish the exact performance penalty? Is it obvious? For example, larger than 1% in general workloads? I know it depends on the workloads.
          Hide
          rxin Reynold Xin added a comment -

          We haven't measured it yet, and as you said it is highly workload dependent.

          Show
          rxin Reynold Xin added a comment - We haven't measured it yet, and as you said it is highly workload dependent.
          Hide
          smilegator Xiao Li added a comment -

          Thank you!

          I think the users might need to understand the potential performance difference when deciding to use DataFrame or Dataset. I will try to suggest my performance team to measure it. Let me know if you have any opinion. Thanks again!

          Show
          smilegator Xiao Li added a comment - Thank you! I think the users might need to understand the potential performance difference when deciding to use DataFrame or Dataset. I will try to suggest my performance team to measure it. Let me know if you have any opinion. Thanks again!
          Hide
          maver1ck Maciej Bryński added a comment -

          Reynold Xin
          What about Python API ? What's the target version ?
          https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

          Quote:
          "As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically:
          Python Support."

          Show
          maver1ck Maciej Bryński added a comment - Reynold Xin What about Python API ? What's the target version ? https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html Quote: "As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: Python Support."
          Hide
          rxin Reynold Xin added a comment - - edited

          After thinking about that more, I don't think it will happen any time soon. We simply don't see the strong benefit with Python to have a typed-safe way to work with data. Afterall, Python itself has no compile time type safety. And many of the runtime benefits of Dataset is already available in Python via Row as a dictionary interface.

          Show
          rxin Reynold Xin added a comment - - edited After thinking about that more, I don't think it will happen any time soon. We simply don't see the strong benefit with Python to have a typed-safe way to work with data. Afterall, Python itself has no compile time type safety. And many of the runtime benefits of Dataset is already available in Python via Row as a dictionary interface.
          Hide
          maver1ck Maciej Bryński added a comment -

          OK.
          So what about this patch ?
          https://issues.apache.org/jira/browse/SPARK-13594

          Should we break backward compatibility in Python Dataframe API ?

          Show
          maver1ck Maciej Bryński added a comment - OK. So what about this patch ? https://issues.apache.org/jira/browse/SPARK-13594 Should we break backward compatibility in Python Dataframe API ?
          Hide
          rxin Reynold Xin added a comment -

          Unfortunately I think that's still necessary. The problem is that "map" should return the same type (i.e. DataFrame), not a different type.

          We can certainly use monkey patch to add a compatibility package though. That might be worth doing.

          Show
          rxin Reynold Xin added a comment - Unfortunately I think that's still necessary. The problem is that "map" should return the same type (i.e. DataFrame), not a different type. We can certainly use monkey patch to add a compatibility package though. That might be worth doing.
          Hide
          nchammas Nicholas Chammas added a comment -

          Python itself has no compile time type safety.

          Practically speaking, this is no longer true. You can get a decent measure of "compile" time type safety using recent additions to Python (both the language itself and the ecosystem).

          Specifically, optional static type checking has been a focus in Python since 3.5+, and according to Python's BDFL both Google and Dropbox are updating large parts of their codebases to use Python's new typing features. Static type checkers for Python like mypy are already in use and are backed by several core Python developers, including Guido van Rossum (Python's creator/BDFL).

          So I don't think Datasets are a critical feature for PySpark just yet, and it will take some time for the general Python community to learn and take advantage of Python's new optional static typing features and tools, but I would keep this on the radar.

          Show
          nchammas Nicholas Chammas added a comment - Python itself has no compile time type safety. Practically speaking, this is no longer true. You can get a decent measure of "compile" time type safety using recent additions to Python (both the language itself and the ecosystem). Specifically, optional static type checking has been a focus in Python since 3.5+, and according to Python's BDFL both Google and Dropbox are updating large parts of their codebases to use Python's new typing features. Static type checkers for Python like mypy are already in use and are backed by several core Python developers, including Guido van Rossum (Python's creator/BDFL). So I don't think Datasets are a critical feature for PySpark just yet, and it will take some time for the general Python community to learn and take advantage of Python's new optional static typing features and tools, but I would keep this on the radar.

            People

            • Assignee:
              marmbrus Michael Armbrust
              Reporter:
              rxin Reynold Xin
            • Votes:
              4 Vote for this issue
              Watchers:
              69 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development