Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18084

write.partitionBy() does not recognize nested columns that select() can access

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.0.0, 2.0.1, 2.4.3
    • None
    • SQL

    Description

      Here's a simple repro in the PySpark shell:

      from pyspark.sql import Row
      rdd = spark.sparkContext.parallelize([Row(a=Row(b=5))])
      df = spark.createDataFrame(rdd)
      df.printSchema()
      df.select('a.b').show()  # works
      df.write.partitionBy('a.b').text('/tmp/test')  # doesn't work
      

      Here's what I see when I run this:

      >>> from pyspark.sql import Row
      >>> rdd = spark.sparkContext.parallelize([Row(a=Row(b=5))])
      >>> df = spark.createDataFrame(rdd)
      >>> df.printSchema()
      root
       |-- a: struct (nullable = true)
       |    |-- b: long (nullable = true)
      
      >>> df.show()
      +---+
      |  a|
      +---+
      |[5]|
      +---+
      
      >>> df.select('a.b').show()
      +---+
      |  b|
      +---+
      |  5|
      +---+
      
      >>> df.write.partitionBy('a.b').text('/tmp/test')
      Traceback (most recent call last):
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/utils.py", line 63, in deco
          return f(*a, **kw)
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o233.text.
      : org.apache.spark.sql.AnalysisException: Partition column a.b not found in schema StructType(StructField(a,StructType(StructField(b,LongType,true)),true));
      	at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$10.apply(PartitioningUtils.scala:368)
      	at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$10.apply(PartitioningUtils.scala:368)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:367)
      	at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:366)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:366)
      	at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:349)
      	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:458)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
      	at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:534)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:280)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:214)
      	at java.lang.Thread.run(Thread.java:745)
      
      
      During handling of the above exception, another exception occurred:
      
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/readwriter.py", line 656, in text
          self._jwrite.text(path)
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/utils.py", line 69, in deco
          raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      pyspark.sql.utils.AnalysisException: 'Partition column a.b not found in schema StructType(StructField(a,StructType(StructField(b,LongType,true)),true));'
      

      I don't understand why there is an AnalysisException when referring to 'a.b' in the write.partitionBy() operation, but not when we do a select().

      Is this expected behavior somehow?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nchammas Nicholas Chammas
              Votes:
              1 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: