Uploaded image for project: 'Apache Sedona'
  1. Apache Sedona
  2. SEDONA-236

Flakey python tests in tests.serialization.test_[de]serializers

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.1
    • 1.4.0

    Description

      We found 2 flaky tests when running CI on a self-hosted AWS EC2 instance, both are related to constructing Spark DataFrames from GeoPandas DataFrames. The failed tests and error messages are as follows.

      TestGeometryConvert.test_from_geopandas_convert

      self = <tests.serialization.test_deserializers.TestGeometryConvert object at 0x7f77a5a84af0>
      
          def test_from_geopandas_convert(self):
              gdf = gpd.read_file(os.path.join(tests_resource, "shapefiles/gis_osm_pois_free_1/"))
          
      >       self.spark.createDataFrame(
                  gdf
              ).show()
      
      tests/serialization/test_deserializers.py:116: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:891: in createDataFrame
          return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/pandas/conversion.py:437: in createDataFrame
          return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:936: in _create_dataframe
          rdd, struct = self._createFromLocal(map(prepare, data), schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:631: in _createFromLocal
          struct = self._inferSchemaFromList(data, names=schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:517: in _inferSchemaFromList
          schema = reduce(
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1383: in _merge_type
          fields = [
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1385: in <listcomp>
          f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), name=new_name(f.name))
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      a = StringType(), b = DoubleType(), name = 'field name'
      
          def _merge_type(
              a: Union[StructType, ArrayType, MapType, DataType],
              b: Union[StructType, ArrayType, MapType, DataType],
              name: Optional[str] = None,
          ) -> Union[StructType, ArrayType, MapType, DataType]:
              if name is None:
          
                  def new_msg(msg: str) -> str:
                      return msg
          
                  def new_name(n: str) -> str:
                      return "field %s" % n
          
              else:
          
                  def new_msg(msg: str) -> str:
                      return "%s: %s" % (name, msg)
          
                  def new_name(n: str) -> str:
                      return "field %s in %s" % (n, name)
          
              if isinstance(a, NullType):
                  return b
              elif isinstance(b, NullType):
                  return a
              elif isinstance(a, TimestampType) and isinstance(b, TimestampNTZType):
                  return a
              elif isinstance(a, TimestampNTZType) and isinstance(b, TimestampType):
                  return b
              elif type(a) is not type(b):
                  # TODO: type cast (such as int -> long)
      >           raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
      E           TypeError: field name: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
      
      

      TestsSerializers.test_geopandas_convertion

      self = <tests.serialization.test_serializers.TestsSerializers object at 0x7f77a5a87310>
      
          def test_geopandas_convertion(self):
              gdf = gpd.read_file(os.path.join(tests_resource, "shapefiles/gis_osm_pois_free_1/"))
      >       print(self.spark.createDataFrame(
                  gdf
              ).toPandas())
      
      tests/serialization/test_serializers.py:148: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:891: in createDataFrame
          return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/pandas/conversion.py:437: in createDataFrame
          return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:936: in _create_dataframe
          rdd, struct = self._createFromLocal(map(prepare, data), schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:631: in _createFromLocal
          struct = self._inferSchemaFromList(data, names=schema)
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/session.py:517: in _inferSchemaFromList
          schema = reduce(
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1383: in _merge_type
          fields = [
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1385: in <listcomp>
          f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()), name=new_name(f.name))
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      a = StringType(), b = DoubleType(), name = 'field name'
      
          def _merge_type(
              a: Union[StructType, ArrayType, MapType, DataType],
              b: Union[StructType, ArrayType, MapType, DataType],
              name: Optional[str] = None,
          ) -> Union[StructType, ArrayType, MapType, DataType]:
              if name is None:
          
                  def new_msg(msg: str) -> str:
                      return msg
          
                  def new_name(n: str) -> str:
                      return "field %s" % n
          
              else:
          
                  def new_msg(msg: str) -> str:
                      return "%s: %s" % (name, msg)
          
                  def new_name(n: str) -> str:
                      return "field %s in %s" % (n, name)
          
              if isinstance(a, NullType):
                  return b
              elif isinstance(b, NullType):
                  return a
              elif isinstance(a, TimestampType) and isinstance(b, TimestampNTZType):
                  return a
              elif isinstance(a, TimestampNTZType) and isinstance(b, TimestampType):
                  return b
              elif type(a) is not type(b):
                  # TODO: type cast (such as int -> long)
      >           raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
      E           TypeError: field name: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
      
      ../spark-3.3.0-bin-hadoop3/python/pyspark/sql/types.py:1378: TypeError
      

      We found lots of missing values in the "name" column in the test Pandas DataFrame, and it is a known problem that PySpark sometimes fails to convert Pandas dataframes containing missing values. We'll replace the missing values before converting the DataFrame to eliminate this problem.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            kontinuation Kristin Cowalcijk
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                Slack

                  Issue deployment