org.apache.spark.sql.AnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidColumnOrFieldDataTypeError(QueryCompilationErrors.scala:3179)
at org.apache.spark.sql.catalyst.plans.logical.Project$.reconcileColumnType(basicLogicalOperators.scala:163)
at org.apache.spark.sql.catalyst.plans.logical.Project$.$anonfun$reorderFields$1(basicLogicalOperators.scala:203)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.Project$.reorderFields(basicLogicalOperators.scala:173)
at org.apache.spark.sql.catalyst.plans.logical.Project$.matchSchema(basicLogicalOperators.scala:103)
at org.apache.spark.sql.Dataset.to(Dataset.scala:485)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLocalRelation(SparkConnectPlanner.scala:635)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:83)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformProject(SparkConnectPlanner.scala:678)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:70)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLimit(SparkConnectPlanner.scala:758)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:72)
at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:58)
at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:49)
at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pyspark/sql/tests/test_column.py:126 (ColumnParityTests.test_field_accessor)
self = <pyspark.sql.tests.connect.test_parity_column.ColumnParityTests testMethod=test_field_accessor>
def test_field_accessor(self):
df = self.spark.createDataFrame([Row(l=[1], r=Row(a=1, b="b"), d={"k": "v"})])
> self.assertEqual(1, df.select(df.l[0]).first()[0])
../test_column.py:129:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../connect/dataframe.py:340: in first
return self.head()
../../connect/dataframe.py:407: in head
rs = self.head(1)
../../connect/dataframe.py:409: in head
return self.take(n)
../../connect/dataframe.py:414: in take
return self.limit(num).collect()
../../connect/dataframe.py:1247: in collect
table = self._session.client.to_table(query)
../../connect/client.py:415: in to_table
table, _ = self._execute_and_fetch(req)
../../connect/client.py:593: in _execute_and_fetch
self._handle_error(rpc_error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7f97b8eff580>
rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.INTERNAL
details = "[INVALID_COLUMN_OR_FI...ATA_TYPE] Column or field `d` is of type \"STRUCT<k: STRING>\" while it\'s required to be \"MAP<STRING, STRING>\"."}"
>
def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
"""
Error handling helper for dealing with GRPC Errors. On the server side, certain
exceptions are enriched with additional RPC Status information. These are
unpacked in this function and put into the exception.
To avoid overloading the user with GRPC errors, this message explicitly
swallows the error context from the call. This GRPC Error is logged however,
and can be enabled.
Parameters
----------
rpc_error : grpc.RpcError
RPC Error containing the details of the exception.
Returns
-------
Throws the appropriate internal Python exception.
"""
logger.exception("GRPC Error received")
# We have to cast the value here because, a RpcError is a Call as well.
# https: status = rpc_status.from_call(cast(grpc.Call, rpc_error))
if status:
for d in status.details:
if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
info = error_details_pb2.ErrorInfo()
d.Unpack(info)
if info.reason == "org.apache.spark.sql.AnalysisException":
> raise SparkConnectAnalysisException(
info.reason, info.metadata["message"], info.metadata["plan"]
) from None
E pyspark.sql.connect.client.SparkConnectAnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be "MAP<STRING, STRING>".
E Plan:
../../connect/client.py:632: SparkConnectAnalysisException