Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.4.0
-
None
-
None
Description
Many users get seminested data form JSON or XML.
There are some problems with querying this data, where there are nested fields.
In pandas there is a json_normalize function that flat out nested dicts.
Here are some examples for the use of those Flatten Complex Nested JSON (PYSPARK)
Unable to load jsonl nested file into a flattened dataframe
With pandas users can use this function
def flatten_pandas(df_): #The same as flatten but for pandas have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist() have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist() have_nested = len(have_list) + len(have_dict) while have_nested!=0: if len(have_list)!=0: for _ in have_list: df_ = df_.explode(_) elif have_dict !=0: df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":") have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist() have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist() have_nested = len(have_list) + len(have_dict) return df_
With pyspark or pandas_api we don't have a function for getting dict to columns implemented.
These are the functions that I'm using to do the same in pyspark.
from pyspark.sql.functions import * from pyspark.sql.types import * def flatten_test(df, sep="_"): """Returns a flattened dataframe. .. versionadded:: x.X.X Parameters ---------- sep : str Delimiter for flatted columns. Default `_` Notes ----- Don`t use `.` as `sep` It won't work on nested data frames with more than one level. And you will have to use `columns.name`. Flattening Map Types will have to find every key in the column. This can be slow. Examples -------- data_mixed = [ { "state": "Florida", "shortname": "FL", "info": {"governor": "Rick Scott"}, "counties": [ {"name": "Dade", "population": 12345}, {"name": "Broward", "population": 40000}, {"name": "Palm Beach", "population": 60000}, ], }, { "state": "Ohio", "shortname": "OH", "info": {"governor": "John Kasich"}, "counties": [ {"name": "Summit", "population": 1234}, {"name": "Cuyahoga", "population": 1337}, ], }, ] data_mixed = spark.createDataFrame(data=data_mixed) data_mixed.printSchema() root |-- counties: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) |-- info: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- shortname: string (nullable = true) |-- state: string (nullable = true) data_mixed_flat = flatten_test(df, sep=":") data_mixed_flat.printSchema() root |-- shortname: string (nullable = true) |-- state: string (nullable = true) |-- counties:name: string (nullable = true) |-- counties:population: string (nullable = true) |-- info:governor: string (nullable = true) data = [ { "id": 1, "name": "Cole Volk", "fitness": {"height": 130, "weight": 60}, }, {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}}, { "id": 2, "name": "Faye Raker", "fitness": {"height": 130, "weight": 60}, }, ] df = spark.createDataFrame(data=data) df.printSchema() root |-- fitness: map (nullable = true) | |-- key: string | |-- value: long (valueContainsNull = true) |-- id: long (nullable = true) |-- name: string (nullable = true) df_flat = flatten_test(df, sep=":") df_flat.printSchema() root |-- id: long (nullable = true) |-- name: string (nullable = true) |-- fitness:height: long (nullable = true) |-- fitness:weight: long (nullable = true) data_struct = [ (("James",None,"Smith"),"OH","M"), (("Anna","Rose",""),"NY","F"), (("Julia","","Williams"),"OH","F"), (("Maria","Anne","Jones"),"NY","M"), (("Jen","Mary","Brown"),"NY","M"), (("Mike","Mary","Williams"),"OH","M") ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('state', StringType(), True), StructField('gender', StringType(), True) ]) df_struct = spark.createDataFrame(data = data_struct, schema = schema) df_struct.printSchema() root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- state: string (nullable = true) |-- gender: string (nullable = true) df_struct_flat = flatten_test(df_struct, sep=":") df_struct_flat.printSchema() root |-- state: string (nullable = true) |-- gender: string (nullable = true) |-- name:firstname: string (nullable = true) |-- name:middlename: string (nullable = true) |-- name:lastname: string (nullable = true) """ # compute Complex Fields (Arrays, Structs and Maptypes) in Schema complex_fields = dict( [ (field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType ] ) while len(complex_fields) != 0: col_name = list(complex_fields.keys())[0] # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name]))) # if StructType then convert all sub element to columns. # i.e. flatten structs if type(complex_fields[col_name]) == StructType: expanded = [ col(col_name + "." + k).alias(col_name + sep + k) for k in [n.name for n in complex_fields[col_name]] ] df = df.select("*", *expanded).drop(col_name) # if ArrayType then add the Array Elements as Rows using the explode function # i.e. explode Arrays elif type(complex_fields[col_name]) == ArrayType: df = df.withColumn(col_name, explode_outer(col_name)) # if MapType then convert all sub element to columns. # i.e. flatten elif type(complex_fields[col_name]) == MapType: keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct() keys = list(map(lambda row: row[0], keys_df.collect())) key_cols = list( map( lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)), keys, ) ) drop_column_list = [col_name] df = df.select( [ col_name for col_name in df.columns if col_name not in drop_column_list ] + key_cols ) # recompute remaining Complex Fields in Schema complex_fields = dict( [ (field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType ] ) return df
# We take a dataframe and return a new one with required changes def cleanDataFrame(df: DataFrame) -> DataFrame: # Returns a new sanitized field name (this function can be anything really) def sanitizeFieldName(s: str) -> str: return ( s.replace("-", "_") .replace("&", "_") .replace('"', "_") .replace("[", "_") .replace("]", "_") .replace(".", "_") ) # We call this on all fields to create a copy and to perform any changes we might # want to do to the field. def sanitizeField(field: StructField) -> StructField: field = copy(field) field.name = sanitizeFieldName(field.name) # We recursively call cleanSchema on all types field.dataType = cleanSchema(field.dataType) return field def cleanSchema(dataType: [DataType]) -> [DateType]: dataType = copy(dataType) # If the type is a StructType we need to recurse otherwise we can return since # we've reached the leaf node if isinstance(dataType, StructType): # We call our sanitizer for all top level fields dataType.fields = [sanitizeField(f) for f in dataType.fields] elif isinstance(dataType, ArrayType): dataType.elementType = cleanSchema(dataType.elementType) return dataType # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
def json_to_norm_with_null(dir_path, path_to_save): path = dir_path for filename in os.listdir(path): if not filename.endswith(".json"): continue fullname = os.path.join(path, filename) with open(fullname) as json_file: jsonstr = json.load(json_file) df = spark.read.json(fullname) df = cleanDataFrame(df) df = flatten_test(df, sep=":") df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
def cleanDataFrame is taken from a post at stackoverflow
def flatten_test the first 2 parts for array and struct are taken from gnist github
The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/
The exemples in def flatten_test are taken from pandas json_normalize function.
There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.