Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41284 Feature parity: I/O in Spark Connect
  3. SPARK-41999

NPE for bucketed write (ReadwriterTests.test_bucketed_write)

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • Connect
    • None

    Description

      java.util.NoSuchElementException
      	at java.util.AbstractList$Itr.next(AbstractList.java:364)
      	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
      	at scala.collection.IterableLike.head(IterableLike.scala:109)
      	at scala.collection.IterableLike.head$(IterableLike.scala:108)
      	at scala.collection.AbstractIterable.head(Iterable.scala:56)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411)
      	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297)
      	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182)
      	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48)
      	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
      	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
      	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
      	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
      	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
      	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing runnable org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784
      java.lang.NullPointerException
      	at org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783)
      	at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112)
      	at org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85)
      	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
      	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136)
      	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
      	at org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
      	at org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
      	at org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
      	at org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      
      pyspark/sql/tests/test_readwriter.py:102 (ReadwriterParityTests.test_bucketed_write)
      self = <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests testMethod=test_bucketed_write>
      
          def test_bucketed_write(self):
              data = [
                  (1, "foo", 3.0),
                  (2, "foo", 5.0),
                  (3, "bar", -1.0),
                  (4, "bar", 6.0),
              ]
              df = self.spark.createDataFrame(data, ["x", "y", "z"])
          
              def count_bucketed_cols(names, table="pyspark_bucket"):
                  """Given a sequence of column names and a table name
                  query the catalog and return number o columns which are
                  used for bucketing
                  """
                  cols = self.spark.catalog.listColumns(table)
                  num = len([c for c in cols if c.name in names and c.isBucket])
                  return num
          
              with self.table("pyspark_bucket"):
                  # Test write with one bucketing column
      >           df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket")
      
      ../test_readwriter.py:123: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ../../connect/readwriter.py:381: in saveAsTable
          self._spark.client.execute_command(self._write.command(self._spark.client))
      ../../connect/client.py:478: in execute_command
          self._execute(req)
      ../../connect/client.py:562: in _execute
          self._handle_error(rpc_error)
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7fe0d069b5b0>
      rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
      	status = StatusCode.UNKNOWN
      	details = ""
      	debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
      >
      
          def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
              """
              Error handling helper for dealing with GRPC Errors. On the server side, certain
              exceptions are enriched with additional RPC Status information. These are
              unpacked in this function and put into the exception.
          
              To avoid overloading the user with GRPC errors, this message explicitly
              swallows the error context from the call. This GRPC Error is logged however,
              and can be enabled.
          
              Parameters
              ----------
              rpc_error : grpc.RpcError
                 RPC Error containing the details of the exception.
          
              Returns
              -------
              Throws the appropriate internal Python exception.
              """
              logger.exception("GRPC Error received")
              # We have to cast the value here because, a RpcError is a Call as well.
              # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
              status = rpc_status.from_call(cast(grpc.Call, rpc_error))
              if status:
                  for d in status.details:
                      if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                          info = error_details_pb2.ErrorInfo()
                          d.Unpack(info)
                          if info.reason == "org.apache.spark.sql.AnalysisException":
                              raise SparkConnectAnalysisException(
                                  info.reason, info.metadata["message"], info.metadata["plan"]
                              ) from None
                          else:
                              raise SparkConnectException(status.message, info.reason) from None
          
                  raise SparkConnectException(status.message) from None
              else:
      >           raise SparkConnectException(str(rpc_error)) from None
      E           pyspark.sql.connect.client.SparkConnectException: <_MultiThreadedRendezvous of RPC that terminated with:
      E           	status = StatusCode.UNKNOWN
      E           	details = ""
      E           	debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, grpc_message:""}"
      E           >
      
      ../../connect/client.py:640: SparkConnectException
      
      

      Attachments

        Activity

          People

            ueshin Takuya Ueshin
            gurwls223 Hyukjin Kwon
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: