Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
3.0.1
-
None
-
None
Description
I tried to create a Python UDF for sorting a column with `MapType` column by its key. Yet the final result seems not sorted as expected.
Initialization
from pyspark.sql.types import MapType, StructType, StructField, StringType, DoubleType, IntegerType from pyspark.sql import Row schema = StructType([ StructField("Name", StringType(), True), StructField("Map_Value", MapType(IntegerType(), DoubleType(), False), False)]) test_data = [ Row(name='sample_1', LD_Value={10: 0.20, 5: 0.30, 17: 0.25}), Row(name='sample_2', LD_Value={11: 0.40, 6: 0.67, 24: 0.45}) ] base = spark.createDataFrame(test_data, schema) # define a Python UDF for sorting map-type columns def sort_maptype_function(col, reverse=False): sorted_col = dict(sorted(col.items(), key=lambda item: item[0], reverse=reverse)) print(sorted_col) return sorted_col sort_map = spark.udf.register( "SORT_MAPTYPE_FUNCTION", sort_maptype_function, MapType(IntegerType(), DoubleType())) result = base.selectExpr("Name", "Map_Value", "SORT_MAPTYPE_FUNCTION(LD_Value) AS Sorted_Map_Value")
Display Result
>>> result.show(truncate=100) {5: 0.3, 10: 0.2, 17: 0.25} {6: 0.67, 11: 0.4, 24: 0.45} +--------+----------------------------------+----------------------------------+ | Name| Map_Value| Sorted_Map_Value| +--------+----------------------------------+----------------------------------+ |sample_1| [17 -> 0.25, 10 -> 0.2, 5 -> 0.3]| [17 -> 0.25, 10 -> 0.2, 5 -> 0.3]| |sample_2|[24 -> 0.45, 11 -> 0.4, 6 -> 0.67]|[24 -> 0.45, 11 -> 0.4, 6 -> 0.67]| +--------+----------------------------------+----------------------------------+
Extended Explanation
== Parsed Logical Plan == 'Project [unresolvedalias('Name, None), unresolvedalias('Map_Value, None), 'SORT_MAPTYPE_FUNCTION('Map_Value) AS Sorted_Map_Value#192] +- LogicalRDD [Name#169, Map_Value#170], false == Analyzed Logical Plan == Name: string, Map_Value: map<int,double>, Sorted_Map_Value: map<int,double> Project [Name#169, Map_Value#170, SORT_MAPTYPE_FUNCTION(Map_Value#170) AS Sorted_Map_Value#192] +- LogicalRDD [Name#169, Map_Value#170], false == Optimized Logical Plan == Project [Name#169, Map_Value#170, pythonUDF0#225 AS Sorted_Map_Value#192] +- BatchEvalPython [SORT_MAPTYPE_FUNCTION(Map_Value#170)], [pythonUDF0#225] +- LogicalRDD [Name#169, Map_Value#170], false == Physical Plan == *(2) Project [Name#169, Map_Value#170, pythonUDF0#225 AS Sorted_Map_Value#192] +- BatchEvalPython [SORT_MAPTYPE_FUNCTION(Map_Value#170)], [pythonUDF0#225] +- *(1) Scan ExistingRDD[Name#169,Map_Value#170]
Why the result after the execution of the UDF for sorting map-type columns by its key still the same as the original column based on PySpark?