Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36950

Normalize semi-structured data into tabular tables.

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.4.0
    • None
    • None

    Description

      Many users get seminested data form JSON or XML.
      There are some problems with querying this data, where there are nested fields.
      In pandas there is a json_normalize function that flat out nested dicts. 

      Here are some examples for the use of those Flatten Complex Nested JSON (PYSPARK)
      Unable to load jsonl nested file into a flattened dataframe

      With pandas users can use this function

       

      def flatten_pandas(df_):
          #The same as flatten but for pandas
          have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
          have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
          have_nested = len(have_list) + len(have_dict)
          
          while have_nested!=0:
              if len(have_list)!=0:
                  for _ in have_list:
                      df_ = df_.explode(_)
                      
              elif have_dict !=0:
                  df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
              
              have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
              have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
              have_nested = len(have_list) + len(have_dict)
              
          return df_
      

       

      With pyspark or pandas_api we don't have a function for getting dict to columns implemented. 
      These are the functions that I'm using to do the same in pyspark.

      from pyspark.sql.functions import *
      from pyspark.sql.types import *
      
      
      def flatten_test(df, sep="_"):
          """Returns a flattened dataframe.
          .. versionadded:: x.X.X
      
          Parameters
          ----------
          sep : str
              Delimiter for flatted columns. Default `_`
      
          Notes
          -----
          Don`t use `.` as `sep`
          It won't work on nested data frames with more than one level.
          And you will have to use `columns.name`.
      
          Flattening Map Types will have to find every key in the column.
          This can be slow.
      
          Examples
          --------
      
          data_mixed = [
              {
                  "state": "Florida",
                  "shortname": "FL",
                  "info": {"governor": "Rick Scott"},
                  "counties": [
                      {"name": "Dade", "population": 12345},
                      {"name": "Broward", "population": 40000},
                      {"name": "Palm Beach", "population": 60000},
                  ],
              },
              {
                  "state": "Ohio",
                  "shortname": "OH",
                  "info": {"governor": "John Kasich"},
                  "counties": [
                      {"name": "Summit", "population": 1234},
                      {"name": "Cuyahoga", "population": 1337},
                  ],
              },
          ]
      
          data_mixed = spark.createDataFrame(data=data_mixed)
      
          data_mixed.printSchema()
      
          root
          |-- counties: array (nullable = true)
          |    |-- element: map (containsNull = true)
          |    |    |-- key: string
          |    |    |-- value: string (valueContainsNull = true)
          |-- info: map (nullable = true)
          |    |-- key: string
          |    |-- value: string (valueContainsNull = true)
          |-- shortname: string (nullable = true)
          |-- state: string (nullable = true)
      
      
          data_mixed_flat = flatten_test(df, sep=":")
          data_mixed_flat.printSchema()
          root
          |-- shortname: string (nullable = true)
          |-- state: string (nullable = true)
          |-- counties:name: string (nullable = true)
          |-- counties:population: string (nullable = true)
          |-- info:governor: string (nullable = true)
      
      
      
      
          data = [
              {
                  "id": 1,
                  "name": "Cole Volk",
                  "fitness": {"height": 130, "weight": 60},
              },
              {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
              {
                  "id": 2,
                  "name": "Faye Raker",
                  "fitness": {"height": 130, "weight": 60},
              },
          ]
      
      
          df = spark.createDataFrame(data=data)
      
          df.printSchema()
      
          root
          |-- fitness: map (nullable = true)
          |    |-- key: string
          |    |-- value: long (valueContainsNull = true)
          |-- id: long (nullable = true)
          |-- name: string (nullable = true)
      
          df_flat = flatten_test(df, sep=":")
      
          df_flat.printSchema()
      
          root
          |-- id: long (nullable = true)
          |-- name: string (nullable = true)
          |-- fitness:height: long (nullable = true)
          |-- fitness:weight: long (nullable = true)
      
          data_struct = [
                  (("James",None,"Smith"),"OH","M"),
                  (("Anna","Rose",""),"NY","F"),
                  (("Julia","","Williams"),"OH","F"),
                  (("Maria","Anne","Jones"),"NY","M"),
                  (("Jen","Mary","Brown"),"NY","M"),
                  (("Mike","Mary","Williams"),"OH","M")
                  ]
      
      
          schema = StructType([
              StructField('name', StructType([
                  StructField('firstname', StringType(), True),
                  StructField('middlename', StringType(), True),
                  StructField('lastname', StringType(), True)
                  ])),
              StructField('state', StringType(), True),
              StructField('gender', StringType(), True)
              ])
      
          df_struct = spark.createDataFrame(data = data_struct, schema = schema)
      
          df_struct.printSchema()
      
          root
          |-- name: struct (nullable = true)
          |    |-- firstname: string (nullable = true)
          |    |-- middlename: string (nullable = true)
          |    |-- lastname: string (nullable = true)
          |-- state: string (nullable = true)
          |-- gender: string (nullable = true)
      
          df_struct_flat = flatten_test(df_struct, sep=":")
      
          df_struct_flat.printSchema()
      
          root
          |-- state: string (nullable = true)
          |-- gender: string (nullable = true)
          |-- name:firstname: string (nullable = true)
          |-- name:middlename: string (nullable = true)
          |-- name:lastname: string (nullable = true)
          """
          # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
          complex_fields = dict(
              [
                  (field.name, field.dataType)
                  for field in df.schema.fields
                  if type(field.dataType) == ArrayType
                  or type(field.dataType) == StructType
                  or type(field.dataType) == MapType
              ]
          )
      
          while len(complex_fields) != 0:
              col_name = list(complex_fields.keys())[0]
              # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
      
              # if StructType then convert all sub element to columns.
              # i.e. flatten structs
              if type(complex_fields[col_name]) == StructType:
                  expanded = [
                      col(col_name + "." + k).alias(col_name + sep + k)
                      for k in [n.name for n in complex_fields[col_name]]
                  ]
                  df = df.select("*", *expanded).drop(col_name)
      
              # if ArrayType then add the Array Elements as Rows using the explode function
              # i.e. explode Arrays
              elif type(complex_fields[col_name]) == ArrayType:
                  df = df.withColumn(col_name, explode_outer(col_name))
      
              # if MapType then convert all sub element to columns.
              # i.e. flatten
              elif type(complex_fields[col_name]) == MapType:
                  keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
                  keys = list(map(lambda row: row[0], keys_df.collect()))
                  key_cols = list(
                      map(
                          lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
                          keys,
                      )
                  )
                  drop_column_list = [col_name]
                  df = df.select(
                      [
                          col_name
                          for col_name in df.columns
                          if col_name not in drop_column_list
                      ]
                      + key_cols
                  )
      
              # recompute remaining Complex Fields in Schema
              complex_fields = dict(
                  [
                      (field.name, field.dataType)
                      for field in df.schema.fields
                      if type(field.dataType) == ArrayType
                      or type(field.dataType) == StructType
                      or type(field.dataType) == MapType
                  ]
              )
      
          return df
      
      # We take a dataframe and return a new one with required changes
      def cleanDataFrame(df: DataFrame) -> DataFrame:
          # Returns a new sanitized field name (this function can be anything really)
          def sanitizeFieldName(s: str) -> str:
              return (
                  s.replace("-", "_")
                  .replace("&", "_")
                  .replace('"', "_")
                  .replace("[", "_")
                  .replace("]", "_")
                  .replace(".", "_")
              )
      
          # We call this on all fields to create a copy and to perform any changes we might
          # want to do to the field.
          def sanitizeField(field: StructField) -> StructField:
              field = copy(field)
              field.name = sanitizeFieldName(field.name)
              # We recursively call cleanSchema on all types
              field.dataType = cleanSchema(field.dataType)
              return field
      
          def cleanSchema(dataType: [DataType]) -> [DateType]:
              dataType = copy(dataType)
              # If the type is a StructType we need to recurse otherwise we can return since
              # we've reached the leaf node
              if isinstance(dataType, StructType):
                  # We call our sanitizer for all top level fields
                  dataType.fields = [sanitizeField(f) for f in dataType.fields]
              elif isinstance(dataType, ArrayType):
                  dataType.elementType = cleanSchema(dataType.elementType)
              return dataType
      
          # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
          return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
      
      def json_to_norm_with_null(dir_path, path_to_save):
          path = dir_path
      
          for filename in os.listdir(path):
              if not filename.endswith(".json"):
                  continue
      
              fullname = os.path.join(path, filename)
              with open(fullname) as json_file:
                  jsonstr = json.load(json_file)
      
              df = spark.read.json(fullname)
              df = cleanDataFrame(df)
              df = flatten_test(df, sep=":")
              df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
      

      def cleanDataFrame is taken from a post at stackoverflow 
      def flatten_test the first 2 parts for array and struct are taken from gnist github 

      The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ 
      The exemples in def flatten_test are taken from pandas json_normalize function.
      There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.

      Attachments

        Activity

          People

            Unassigned Unassigned
            bjornjorgensen Bjørn Jørgensen
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: