java.util.NoSuchElementException
at java.util.AbstractList$Itr.next(AbstractList.java:364)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at scala.collection.IterableLike.head(IterableLike.scala:109)
at scala.collection.IterableLike.head$(IterableLike.scala:108)
at scala.collection.AbstractIterable.head(Iterable.scala:56)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297)
at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182)
at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48)
at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing runnable org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784
java.lang.NullPointerException
at org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783)
at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112)
at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136)
at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pyspark/sql/tests/test_readwriter.py:102 (ReadwriterParityTests.test_bucketed_write)
self = <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests testMethod=test_bucketed_write>
def test_bucketed_write(self):
data = [
(1, "foo", 3.0),
(2, "foo", 5.0),
(3, "bar", -1.0),
(4, "bar", 6.0),
]
df = self.spark.createDataFrame(data, ["x", "y", "z"])
def count_bucketed_cols(names, table="pyspark_bucket"):
"""Given a sequence of column names and a table name
query the catalog and return number o columns which are
used for bucketing
"""
cols = self.spark.catalog.listColumns(table)
num = len([c for c in cols if c.name in names and c.isBucket])
return num
with self.table("pyspark_bucket"):
# Test write with one bucketing column
> df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket")
../test_readwriter.py:123:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../connect/readwriter.py:381: in saveAsTable
self._spark.client.execute_command(self._write.command(self._spark.client))
../../connect/client.py:478: in execute_command
self._execute(req)
../../connect/client.py:562: in _execute
self._handle_error(rpc_error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7fe0d069b5b0>
rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
>
def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
"""
Error handling helper for dealing with GRPC Errors. On the server side, certain
exceptions are enriched with additional RPC Status information. These are
unpacked in this function and put into the exception.
To avoid overloading the user with GRPC errors, this message explicitly
swallows the error context from the call. This GRPC Error is logged however,
and can be enabled.
Parameters
----------
rpc_error : grpc.RpcError
RPC Error containing the details of the exception.
Returns
-------
Throws the appropriate internal Python exception.
"""
logger.exception("GRPC Error received")
# We have to cast the value here because, a RpcError is a Call as well.
# https: status = rpc_status.from_call(cast(grpc.Call, rpc_error))
if status:
for d in status.details:
if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
info = error_details_pb2.ErrorInfo()
d.Unpack(info)
if info.reason == "org.apache.spark.sql.AnalysisException":
raise SparkConnectAnalysisException(
info.reason, info.metadata["message"], info.metadata["plan"]
) from None
else:
raise SparkConnectException(status.message, info.reason) from None
raise SparkConnectException(status.message) from None
else:
> raise SparkConnectException(str(rpc_error)) from None
E pyspark.sql.connect.client.SparkConnectException: <_MultiThreadedRendezvous of RPC that terminated with:
E status = StatusCode.UNKNOWN
E details = ""
E debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
E >
../../connect/client.py:640: SparkConnectException