Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22532

Spark SQL function 'drop_duplicates' throws error when passing in a column that is an element of a struct

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0, 2.2.0
    • None
    • SQL

    Description

      When attempting to use drop_duplicates with a subset of columns that exist within a struct the following error it raised:

      AnalysisException: u'Cannot resolve column name "header.eventId.lo" among (header);'
      

      A complete example (using old sqlContext syntax so the same code can be run with Spark 1.x as well):

      from pyspark.sql import Row
      from pyspark.sql.functions import *
      
      data = [
          Row(header=Row(eventId=Row(lo=0, hi=1))),
          Row(header=Row(eventId=Row(lo=0, hi=1))),
          Row(header=Row(eventId=Row(lo=1, hi=2))),
          Row(header=Row(eventId=Row(lo=2, hi=3))),
      ]
      
      df = sqlContext.createDataFrame(data)
      
      df.drop_duplicates(['header.eventId.lo', 'header.eventId.hi']).show()
      

      produces the following stack trace:

      ---------------------------------------------------------------------------
      AnalysisException                         Traceback (most recent call last)
      <ipython-input-1-d44c25c1919c> in <module>()
           11 df = sqlContext.createDataFrame(data)
           12
      ---> 13 df.drop_duplicates(['header.eventId.lo', 'header.eventId.hi']).show()
      
      /usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/sql/dataframe.py in dropDuplicates(self, subset)
         1243             jdf = self._jdf.dropDuplicates()
         1244         else:
      -> 1245             jdf = self._jdf.dropDuplicates(self._jseq(subset))
         1246         return DataFrame(jdf, self.sql_ctx)
         1247
      
      /usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
         1131         answer = self.gateway_client.send_command(command)
         1132         return_value = get_return_value(
      -> 1133             answer, self.gateway_client, self.target_id, self.name)
         1134
         1135         for temp_arg in temp_args:
      
      /usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
           67                                              e.java_exception.getStackTrace()))
           68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
      ---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
           70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
           71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
      
      AnalysisException: u'Cannot resolve column name "header.eventId.lo" among (header);'
      

      This works correctly in Spark 1.6, but fails in 2.1 (via homebrew and CDH) and 2.2 (via homebrew)

      An inconvenient workaround (but it works) is the following:

      (
          df
          .withColumn('lo', col('header.eventId.lo'))
          .withColumn('hi', col('header.eventId.hi'))
          .drop_duplicates(['lo', 'hi'])
          .drop('lo')
          .drop('hi')
          .show()
      )
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            nhakobian Nicholas Hakobian
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: