Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41282 Feature parity: Column API in Spark Connect
  3. SPARK-42016

Type inconsistency of struct and map when accessing the nested column

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • Connect
    • None

    Description

      org.apache.spark.sql.AnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
      	at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidColumnOrFieldDataTypeError(QueryCompilationErrors.scala:3179)
      	at org.apache.spark.sql.catalyst.plans.logical.Project$.reconcileColumnType(basicLogicalOperators.scala:163)
      	at org.apache.spark.sql.catalyst.plans.logical.Project$.$anonfun$reorderFields$1(basicLogicalOperators.scala:203)
      	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
      	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
      	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
      	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
      	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
      	at org.apache.spark.sql.catalyst.plans.logical.Project$.reorderFields(basicLogicalOperators.scala:173)
      	at org.apache.spark.sql.catalyst.plans.logical.Project$.matchSchema(basicLogicalOperators.scala:103)
      	at org.apache.spark.sql.Dataset.to(Dataset.scala:485)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLocalRelation(SparkConnectPlanner.scala:635)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:83)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformProject(SparkConnectPlanner.scala:678)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:70)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLimit(SparkConnectPlanner.scala:758)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:72)
      	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:58)
      	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:49)
      	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
      	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
      	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
      	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
      	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
      	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      
      pyspark/sql/tests/test_column.py:126 (ColumnParityTests.test_field_accessor)
      self = <pyspark.sql.tests.connect.test_parity_column.ColumnParityTests testMethod=test_field_accessor>
      
          def test_field_accessor(self):
              df = self.spark.createDataFrame([Row(l=[1], r=Row(a=1, b="b"), d={"k": "v"})])
      >       self.assertEqual(1, df.select(df.l[0]).first()[0])
      
      ../test_column.py:129: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../../connect/dataframe.py:340: in first
          return self.head()
      ../../connect/dataframe.py:407: in head
          rs = self.head(1)
      ../../connect/dataframe.py:409: in head
          return self.take(n)
      ../../connect/dataframe.py:414: in take
          return self.limit(num).collect()
      ../../connect/dataframe.py:1247: in collect
          table = self._session.client.to_table(query)
      ../../connect/client.py:415: in to_table
          table, _ = self._execute_and_fetch(req)
      ../../connect/client.py:593: in _execute_and_fetch
          self._handle_error(rpc_error)
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7f97b8eff580>
      rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
      	status = StatusCode.INTERNAL
      	details = "[INVALID_COLUMN_OR_FI...ATA_TYPE] Column or field `d` is of type \"STRUCT<k: STRING>\" while it\'s required to be \"MAP<STRING, STRING>\"."}"
      >
      
          def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
              """
              Error handling helper for dealing with GRPC Errors. On the server side, certain
              exceptions are enriched with additional RPC Status information. These are
              unpacked in this function and put into the exception.
          
              To avoid overloading the user with GRPC errors, this message explicitly
              swallows the error context from the call. This GRPC Error is logged however,
              and can be enabled.
          
              Parameters
              ----------
              rpc_error : grpc.RpcError
                 RPC Error containing the details of the exception.
          
              Returns
              -------
              Throws the appropriate internal Python exception.
              """
              logger.exception("GRPC Error received")
              # We have to cast the value here because, a RpcError is a Call as well.
              # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
              status = rpc_status.from_call(cast(grpc.Call, rpc_error))
              if status:
                  for d in status.details:
                      if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                          info = error_details_pb2.ErrorInfo()
                          d.Unpack(info)
                          if info.reason == "org.apache.spark.sql.AnalysisException":
      >                       raise SparkConnectAnalysisException(
                                  info.reason, info.metadata["message"], info.metadata["plan"]
                              ) from None
      E                       pyspark.sql.connect.client.SparkConnectAnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
      E                       Plan:
      
      ../../connect/client.py:632: SparkConnectAnalysisException
      
      

      Attachments

        Activity

          People

            podongfeng Ruifeng Zheng
            gurwls223 Hyukjin Kwon
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: