  SPARK-36950

Normalize semi-structured data into tabular tables.



      Many users get seminested data form JSON or XML.
      There are some problems with querying this data, where there are nested fields.
      In pandas there is a json_normalize function that flat out nested dicts. 

      Here are some examples for the use of those Flatten Complex Nested JSON (PYSPARK)
      Unable to load jsonl nested file into a flattened dataframe

      With pandas users can use this function


      def flatten_pandas(df_):
          #The same as flatten but for pandas
          have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
          have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
          have_nested = len(have_list) + len(have_dict)
          while have_nested!=0:
              if len(have_list)!=0:
                  for _ in have_list:
                      df_ = df_.explode(_)
              elif have_dict !=0:
                  df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
              have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
              have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
              have_nested = len(have_list) + len(have_dict)
          return df_


      With pyspark or pandas_api we don't have a function for getting dict to columns implemented. 
      These are the functions that I'm using to do the same in pyspark.

      from pyspark.sql.functions import *
      from pyspark.sql.types import *
      def flatten_test(df, sep="_"):
          """Returns a flattened dataframe.
          .. versionadded:: x.X.X
          sep : str
              Delimiter for flatted columns. Default `_`
          Don`t use `.` as `sep`
          It won't work on nested data frames with more than one level.
          And you will have to use `columns.name`.
          Flattening Map Types will have to find every key in the column.
          This can be slow.
          data_mixed = [
                  "state": "Florida",
                  "shortname": "FL",
                  "info": {"governor": "Rick Scott"},
                  "counties": [
                      {"name": "Dade", "population": 12345},
                      {"name": "Broward", "population": 40000},
                      {"name": "Palm Beach", "population": 60000},
                  "state": "Ohio",
                  "shortname": "OH",
                  "info": {"governor": "John Kasich"},
                  "counties": [
                      {"name": "Summit", "population": 1234},
                      {"name": "Cuyahoga", "population": 1337},
          data_mixed = spark.createDataFrame(data=data_mixed)
          |-- counties: array (nullable = true)
          |    |-- element: map (containsNull = true)
          |    |    |-- key: string
          |    |    |-- value: string (valueContainsNull = true)
          |-- info: map (nullable = true)
          |    |-- key: string
          |    |-- value: string (valueContainsNull = true)
          |-- shortname: string (nullable = true)
          |-- state: string (nullable = true)
          data_mixed_flat = flatten_test(df, sep=":")
          |-- shortname: string (nullable = true)
          |-- state: string (nullable = true)
          |-- counties:name: string (nullable = true)
          |-- counties:population: string (nullable = true)
          |-- info:governor: string (nullable = true)
          data = [
                  "id": 1,
                  "name": "Cole Volk",
                  "fitness": {"height": 130, "weight": 60},
              {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
                  "id": 2,
                  "name": "Faye Raker",
                  "fitness": {"height": 130, "weight": 60},
          df = spark.createDataFrame(data=data)
          |-- fitness: map (nullable = true)
          |    |-- key: string
          |    |-- value: long (valueContainsNull = true)
          |-- id: long (nullable = true)
          |-- name: string (nullable = true)
          df_flat = flatten_test(df, sep=":")
          |-- id: long (nullable = true)
          |-- name: string (nullable = true)
          |-- fitness:height: long (nullable = true)
          |-- fitness:weight: long (nullable = true)
          data_struct = [
          schema = StructType([
              StructField('name', StructType([
                  StructField('firstname', StringType(), True),
                  StructField('middlename', StringType(), True),
                  StructField('lastname', StringType(), True)
              StructField('state', StringType(), True),
              StructField('gender', StringType(), True)
          df_struct = spark.createDataFrame(data = data_struct, schema = schema)
          |-- name: struct (nullable = true)
          |    |-- firstname: string (nullable = true)
          |    |-- middlename: string (nullable = true)
          |    |-- lastname: string (nullable = true)
          |-- state: string (nullable = true)
          |-- gender: string (nullable = true)
          df_struct_flat = flatten_test(df_struct, sep=":")
          |-- state: string (nullable = true)
          |-- gender: string (nullable = true)
          |-- name:firstname: string (nullable = true)
          |-- name:middlename: string (nullable = true)
          |-- name:lastname: string (nullable = true)
          # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
          complex_fields = dict(
                  (field.name, field.dataType)
                  for field in df.schema.fields
                  if type(field.dataType) == ArrayType
                  or type(field.dataType) == StructType
                  or type(field.dataType) == MapType
          while len(complex_fields) != 0:
              col_name = list(complex_fields.keys())[0]
              # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
              # if StructType then convert all sub element to columns.
              # i.e. flatten structs
              if type(complex_fields[col_name]) == StructType:
                  expanded = [
                      col(col_name + "." + k).alias(col_name + sep + k)
                      for k in [n.name for n in complex_fields[col_name]]
                  df = df.select("*", *expanded).drop(col_name)
              # if ArrayType then add the Array Elements as Rows using the explode function
              # i.e. explode Arrays
              elif type(complex_fields[col_name]) == ArrayType:
                  df = df.withColumn(col_name, explode_outer(col_name))
              # if MapType then convert all sub element to columns.
              # i.e. flatten
              elif type(complex_fields[col_name]) == MapType:
                  keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
                  keys = list(map(lambda row: row[0], keys_df.collect()))
                  key_cols = list(
                          lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
                  drop_column_list = [col_name]
                  df = df.select(
                          for col_name in df.columns
                          if col_name not in drop_column_list
                      + key_cols
              # recompute remaining Complex Fields in Schema
              complex_fields = dict(
                      (field.name, field.dataType)
                      for field in df.schema.fields
                      if type(field.dataType) == ArrayType
                      or type(field.dataType) == StructType
                      or type(field.dataType) == MapType
          return df
      # We take a dataframe and return a new one with required changes
      def cleanDataFrame(df: DataFrame) -> DataFrame:
          # Returns a new sanitized field name (this function can be anything really)
          def sanitizeFieldName(s: str) -> str:
              return (
                  s.replace("-", "_")
                  .replace("&", "_")
                  .replace('"', "_")
                  .replace("[", "_")
                  .replace("]", "_")
                  .replace(".", "_")
          # We call this on all fields to create a copy and to perform any changes we might
          # want to do to the field.
          def sanitizeField(field: StructField) -> StructField:
              field = copy(field)
              field.name = sanitizeFieldName(field.name)
              # We recursively call cleanSchema on all types
              field.dataType = cleanSchema(field.dataType)
              return field
          def cleanSchema(dataType: [DataType]) -> [DateType]:
              dataType = copy(dataType)
              # If the type is a StructType we need to recurse otherwise we can return since
              # we've reached the leaf node
              if isinstance(dataType, StructType):
                  # We call our sanitizer for all top level fields
                  dataType.fields = [sanitizeField(f) for f in dataType.fields]
              elif isinstance(dataType, ArrayType):
                  dataType.elementType = cleanSchema(dataType.elementType)
              return dataType
          # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
          return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
      def json_to_norm_with_null(dir_path, path_to_save):
          path = dir_path
          for filename in os.listdir(path):
              if not filename.endswith(".json"):
              fullname = os.path.join(path, filename)
              with open(fullname) as json_file:
                  jsonstr = json.load(json_file)
              df = spark.read.json(fullname)
              df = cleanDataFrame(df)
              df = flatten_test(df, sep=":")
              df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)

      def cleanDataFrame is taken from a post at stackoverflow 
      def flatten_test the first 2 parts for array and struct are taken from gnist github 

      The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ 
      The exemples in def flatten_test are taken from pandas json_normalize function.
      There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.




