Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.3.1
Description
We found 2 flaky tests when running CI on a self-hosted AWS EC2 instance, both are related to constructing Spark DataFrames from GeoPandas DataFrames. The failed tests and error messages are as follows.
TestGeometryConvert.test_from_geopandas_convert
self = <tests.serialization.test_deserializers.TestGeometryConvert object at 0x7f77a5a84af0> def test_from_geopandas_convert(self): gdf = gpd.read_file(os.path.join(tests_resource, "shapefiles/gis_osm_pois_free_1/")) > self.spark.createDataFrame( gdf ).show() tests/serialization/test_deserializers.py:116: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:891: in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/pandas/conversion.py:437: in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:936: in _create_dataframe rdd, struct = self._createFromLocal(map(prepare, data), schema) ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:631: in _createFromLocal struct = self._inferSchemaFromList(data, names=schema) ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:517: in _inferSchemaFromList schema = reduce( ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1383: in _merge_type fields = [ ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1385: in <listcomp> f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), name=new_name(f.name)) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ a = StringType(), b = DoubleType(), name = 'field name' def _merge_type( a: Union[StructType, ArrayType, MapType, DataType], b: Union[StructType, ArrayType, MapType, DataType], name: Optional[str] = None, ) -> Union[StructType, ArrayType, MapType, DataType]: if name is None: def new_msg(msg: str) -> str: return msg def new_name(n: str) -> str: return "field %s" % n else: def new_msg(msg: str) -> str: return "%s: %s" % (name, msg) def new_name(n: str) -> str: return "field %s in %s" % (n, name) if isinstance(a, NullType): return b elif isinstance(b, NullType): return a elif isinstance(a, TimestampType) and isinstance(b, TimestampNTZType): return a elif isinstance(a, TimestampNTZType) and isinstance(b, TimestampType): return b elif type(a) is not type(b): # TODO: type cast (such as int -> long) > raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b)))) E TypeError: field name: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
TestsSerializers.test_geopandas_convertion
self = <tests.serialization.test_serializers.TestsSerializers object at 0x7f77a5a87310> def test_geopandas_convertion(self): gdf = gpd.read_file(os.path.join(tests_resource, "shapefiles/gis_osm_pois_free_1/")) > print(self.spark.createDataFrame( gdf ).toPandas()) tests/serialization/test_serializers.py:148: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:891: in createDataFrame return super(SparkSession, self).createDataFrame( # type: ignore[call-overload] ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/pandas/conversion.py:437: in createDataFrame return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema) ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:936: in _create_dataframe rdd, struct = self._createFromLocal(map(prepare, data), schema) ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:631: in _createFromLocal struct = self._inferSchemaFromList(data, names=schema) ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:517: in _inferSchemaFromList schema = reduce( ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1383: in _merge_type fields = [ ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1385: in <listcomp> f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), name=new_name(f.name)) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ a = StringType(), b = DoubleType(), name = 'field name' def _merge_type( a: Union[StructType, ArrayType, MapType, DataType], b: Union[StructType, ArrayType, MapType, DataType], name: Optional[str] = None, ) -> Union[StructType, ArrayType, MapType, DataType]: if name is None: def new_msg(msg: str) -> str: return msg def new_name(n: str) -> str: return "field %s" % n else: def new_msg(msg: str) -> str: return "%s: %s" % (name, msg) def new_name(n: str) -> str: return "field %s in %s" % (n, name) if isinstance(a, NullType): return b elif isinstance(b, NullType): return a elif isinstance(a, TimestampType) and isinstance(b, TimestampNTZType): return a elif isinstance(a, TimestampNTZType) and isinstance(b, TimestampType): return b elif type(a) is not type(b): # TODO: type cast (such as int -> long) > raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b)))) E TypeError: field name: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'> ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1378: TypeError
We found lots of missing values in the "name" column in the test Pandas DataFrame, and it is a known problem that PySpark sometimes fails to convert Pandas dataframes containing missing values. We'll replace the missing values before converting the DataFrame to eliminate this problem.
Attachments
Issue Links
- links to