Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16483

Unifying struct fields and columns



    • Type: New Feature
    • Status: Reopened
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.3.1
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:


      This issue comes as a result of an exchange with Michael Armbrust outside of the usual JIRA/dev list channels.

      DataFrame provides a full set of manipulation operations for top-level columns. They have be added, removed, modified and renamed. The same is not true about fields inside structs yet, from a logical standpoint, Spark users may very well want to perform the same operations on struct fields, especially since automatic schema discovery from JSON input tends to create deeply nested structs.

      Common use-cases include:

      • Remove and/or rename struct field(s) to adjust the schema
      • Fix a data quality issue with a struct field (update/rewrite)

      To do this with the existing API by hand requires manually calling named_struct and listing all fields, including ones we don't want to manipulate. This leads to complex, fragile code that cannot survive schema evolution.

      It would be far better if the various APIs that can now manipulate top-level columns were extended to handle struct fields at arbitrary locations or, alternatively, if we introduced new APIs for modifying any field in a dataframe, whether it is a top-level one or one nested inside a struct.

      Purely for discussion purposes (overloaded methods are not shown):

      class Column(val expr: Expression) extends Logging {
        // ...
        // matches Dataset.schema semantics
        def schema: StructType
        // matches Dataset.select() semantics
        // '* support allows multiple new fields to be added easily, saving cumbersome repeated withColumn() calls
        def select(cols: Column*): Column
        // matches Dataset.withColumn() semantics of add or replace
        def withColumn(colName: String, col: Column): Column
        // matches Dataset.drop() semantics
        def drop(colName: String): Column
      class Dataset[T] ... {
        // ...
        // Equivalent to sparkSession.createDataset(toDF.rdd, newSchema)
        def cast(newShema: StructType): DataFrame

      The benefit of the above API is that it unifies manipulating top-level & nested columns. The addition of schema and select() to Column allows for nested field reordering, casting, etc., which is important in data exchange scenarios where field position matters. That's also the reason to add cast to Dataset: it improves consistency and readability (with method chaining). Another way to think of Dataset.cast is as the Spark schema equivalent of Dataset.asas is to cast as a Scala encodable type is to a StructType instance.


          Issue Links



              • Assignee:
                simeons Simeon Simeonov
              • Votes:
                1 Vote for this issue
                19 Start watching this issue


                • Created: