Uploaded image for project: 'Apache Sedona'
  1. Apache Sedona
  2. SEDONA-236

Flakey python tests in tests.serialization.test_[de]serializers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.1
    • 1.4.0

    Description

      We found 2 flaky tests when running CI on a self-hosted AWS EC2 instance, both are related to constructing Spark DataFrames from GeoPandas DataFrames. The failed tests and error messages are as follows.

      TestGeometryConvert.test_from_geopandas_convert

      self = <tests.serialization.test_deserializers.TestGeometryConvert object at 0x7f77a5a84af0>
      
          def test_from_geopandas_convert(self):
              gdf = gpd.read_file(os.path.join(tests_resource, "shapefiles/gis_osm_pois_free_1/"))
          
      >       self.spark.createDataFrame(
                  gdf
              ).show()
      
      tests/serialization/test_deserializers.py:116: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:891: in createDataFrame
          return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/pandas/conversion.py:437: in createDataFrame
          return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:936: in _create_dataframe
          rdd, struct = self._createFromLocal(map(prepare, data), schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:631: in _createFromLocal
          struct = self._inferSchemaFromList(data, names=schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:517: in _inferSchemaFromList
          schema = reduce(
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1383: in _merge_type
          fields = [
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1385: in <listcomp>
          f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), name=new_name(f.name))
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      a = StringType(), b = DoubleType(), name = 'field name'
      
          def _merge_type(
              a: Union[StructType, ArrayType, MapType, DataType],
              b: Union[StructType, ArrayType, MapType, DataType],
              name: Optional[str] = None,
          ) -> Union[StructType, ArrayType, MapType, DataType]:
              if name is None:
          
                  def new_msg(msg: str) -> str:
                      return msg
          
                  def new_name(n: str) -> str:
                      return "field %s" % n
          
              else:
          
                  def new_msg(msg: str) -> str:
                      return "%s: %s" % (name, msg)
          
                  def new_name(n: str) -> str:
                      return "field %s in %s" % (n, name)
          
              if isinstance(a, NullType):
                  return b
              elif isinstance(b, NullType):
                  return a
              elif isinstance(a, TimestampType) and isinstance(b, TimestampNTZType):
                  return a
              elif isinstance(a, TimestampNTZType) and isinstance(b, TimestampType):
                  return b
              elif type(a) is not type(b):
                  # TODO: type cast (such as int -> long)
      >           raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
      E           TypeError: field name: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
      
      

      TestsSerializers.test_geopandas_convertion

      self = <tests.serialization.test_serializers.TestsSerializers object at 0x7f77a5a87310>
      
          def test_geopandas_convertion(self):
              gdf = gpd.read_file(os.path.join(tests_resource, "shapefiles/gis_osm_pois_free_1/"))
      >       print(self.spark.createDataFrame(
                  gdf
              ).toPandas())
      
      tests/serialization/test_serializers.py:148: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:891: in createDataFrame
          return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/pandas/conversion.py:437: in createDataFrame
          return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:936: in _create_dataframe
          rdd, struct = self._createFromLocal(map(prepare, data), schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:631: in _createFromLocal
          struct = self._inferSchemaFromList(data, names=schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:517: in _inferSchemaFromList
          schema = reduce(
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1383: in _merge_type
          fields = [
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1385: in <listcomp>
          f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), name=new_name(f.name))
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      a = StringType(), b = DoubleType(), name = 'field name'
      
          def _merge_type(
              a: Union[StructType, ArrayType, MapType, DataType],
              b: Union[StructType, ArrayType, MapType, DataType],
              name: Optional[str] = None,
          ) -> Union[StructType, ArrayType, MapType, DataType]:
              if name is None:
          
                  def new_msg(msg: str) -> str:
                      return msg
          
                  def new_name(n: str) -> str:
                      return "field %s" % n
          
              else:
          
                  def new_msg(msg: str) -> str:
                      return "%s: %s" % (name, msg)
          
                  def new_name(n: str) -> str:
                      return "field %s in %s" % (n, name)
          
              if isinstance(a, NullType):
                  return b
              elif isinstance(b, NullType):
                  return a
              elif isinstance(a, TimestampType) and isinstance(b, TimestampNTZType):
                  return a
              elif isinstance(a, TimestampNTZType) and isinstance(b, TimestampType):
                  return b
              elif type(a) is not type(b):
                  # TODO: type cast (such as int -> long)
      >           raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
      E           TypeError: field name: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
      
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1378: TypeError
      

      We found lots of missing values in the "name" column in the test Pandas DataFrame, and it is a known problem that PySpark sometimes fails to convert Pandas dataframes containing missing values. We'll replace the missing values before converting the DataFrame to eliminate this problem.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kontinuation Kristin Cowalcijk
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m