Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16483

Unifying struct fields and columns

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • SQL

    Description

      This issue comes as a result of an exchange with Michael Armbrust outside of the usual JIRA/dev list channels.

      DataFrame provides a full set of manipulation operations for top-level columns. They have be added, removed, modified and renamed. The same is not true about fields inside structs yet, from a logical standpoint, Spark users may very well want to perform the same operations on struct fields, especially since automatic schema discovery from JSON input tends to create deeply nested structs.

      Common use-cases include:

      • Remove and/or rename struct field(s) to adjust the schema
      • Fix a data quality issue with a struct field (update/rewrite)

      To do this with the existing API by hand requires manually calling named_struct and listing all fields, including ones we don't want to manipulate. This leads to complex, fragile code that cannot survive schema evolution.

      It would be far better if the various APIs that can now manipulate top-level columns were extended to handle struct fields at arbitrary locations or, alternatively, if we introduced new APIs for modifying any field in a dataframe, whether it is a top-level one or one nested inside a struct.

      Purely for discussion purposes (overloaded methods are not shown):

      class Column(val expr: Expression) extends Logging {
      
        // ...
      
        // matches Dataset.schema semantics
        def schema: StructType
      
        // matches Dataset.select() semantics
        // '* support allows multiple new fields to be added easily, saving cumbersome repeated withColumn() calls
        def select(cols: Column*): Column
      
        // matches Dataset.withColumn() semantics of add or replace
        def withColumn(colName: String, col: Column): Column
      
        // matches Dataset.drop() semantics
        def drop(colName: String): Column
      
      }
      
      class Dataset[T] ... {
      
        // ...
      
        // Equivalent to sparkSession.createDataset(toDF.rdd, newSchema)
        def cast(newShema: StructType): DataFrame
      
      }
      

      The benefit of the above API is that it unifies manipulating top-level & nested columns. The addition of schema and select() to Column allows for nested field reordering, casting, etc., which is important in data exchange scenarios where field position matters. That's also the reason to add cast to Dataset: it improves consistency and readability (with method chaining). Another way to think of Dataset.cast is as the Spark schema equivalent of Dataset.asas is to cast as a Scala encodable type is to a StructType instance.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              simeons Simeon Simeonov
              Votes:
              1 Vote for this issue
              Watchers:
              17 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: