Description
toPandas support duplicate columns name, but for a struct column, it doesnot support duplicate field names.
In [27]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False) In [28]: spark.sql("select 1 v, 1 v").toPandas() Out[28]: v v 0 1 1 In [29]: spark.sql("select struct(1 v, 1 v)").toPandas() Out[29]: struct(1 AS v, 1 AS v) 0 (1, 1) In [30]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True) In [31]: spark.sql("select 1 v, 1 v").toPandas() Out[31]: v v 0 1 1 In [32]: spark.sql("select struct(1 v, 1 v)").toPandas() /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/pandas/conversion.py:204: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation. Ran out of field metadata, likely malformed warn(msg) --------------------------------------------------------------------------- ArrowInvalid Traceback (most recent call last) Cell In[32], line 1 ----> 1 spark.sql("select struct(1 v, 1 v)").toPandas() File ~/Dev/spark/python/pyspark/sql/pandas/conversion.py:143, in PandasConversionMixin.toPandas(self) 141 tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))] 142 self_destruct = jconf.arrowPySparkSelfDestructEnabled() --> 143 batches = self.toDF(*tmp_column_names)._collect_as_arrow( 144 split_batches=self_destruct 145 ) 146 if len(batches) > 0: 147 table = pyarrow.Table.from_batches(batches) File ~/Dev/spark/python/pyspark/sql/pandas/conversion.py:358, in PandasConversionMixin._collect_as_arrow(self, split_batches) 356 results.append(batch_or_indices) 357 else: --> 358 results = list(batch_stream) 359 finally: 360 # Join serving thread and raise any exceptions from collectAsArrowToPython 361 jsocket_auth_server.getResult() File ~/Dev/spark/python/pyspark/sql/pandas/serializers.py:55, in ArrowCollectSerializer.load_stream(self, stream) 50 """ 51 Load a stream of un-ordered Arrow RecordBatches, where the last iteration yields 52 a list of indices that can be used to put the RecordBatches in the correct order. 53 """ 54 # load the batches ---> 55 for batch in self.serializer.load_stream(stream): 56 yield batch 58 # load the batch order indices or propagate any error that occurred in the JVM File ~/Dev/spark/python/pyspark/sql/pandas/serializers.py:98, in ArrowStreamSerializer.load_stream(self, stream) 95 import pyarrow as pa 97 reader = pa.ipc.open_stream(stream) ---> 98 for batch in reader: 99 yield batch File ~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/ipc.pxi:638, in __iter__() File ~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/ipc.pxi:674, in pyarrow.lib.RecordBatchReader.read_next_batch() File ~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/error.pxi:100, in pyarrow.lib.check_status() ArrowInvalid: Ran out of field metadata, likely malformed
Attachments
Issue Links
- links to