Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22216 Improving PySpark/Pandas interoperability
  3. SPARK-24324

Pandas Grouped Map UserDefinedFunction mixes column labels

    XMLWordPrintableJSON

    Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.4.0
    • Component/s: PySpark
    • Labels:
      None
    • Environment:

      Description

      I am working on Wikipedia page views (see task T188041 on Wikimedia's Pharicator). For simplicity, let's say that these are the data:

      <lang> <page> <timestamp> <views>
      

      For each combination of (lang, page, day(timestamp)) I need to transform the views for each hour:

      00:00 -> A
      01:00 -> B
      ...
      

      and concatenate the number of views for that hour. So, if a page got 5 views at 00:00 and 7 views at 01:00 it would become:

      A5B7
      

      I have written a UDF called

      concat_hours

      However, the function is mixing the columns and I am not sure what is going on. I wrote here a minimal complete example that reproduces the issue on my system (the details of my environment are above).

      #!/usr/bin/env python3
      # coding: utf-8
      
      input_data = b"""en Albert_Camus 20071210-000000 150
      en Albert_Camus 20071210-010000 148
      en Albert_Camus 20071210-020000 197
      en Albert_Camus 20071211-200000 145
      en Albert_Camus 20071211-210000 131
      en Albert_Camus 20071211-220000 154
      en Albert_Camus 20071211-230001 142
      en Albert_Caquot 20071210-020000 1
      en Albert_Caquot 20071210-020000 1
      en Albert_Caquot 20071210-040001 1
      en Albert_Caquot 20071211-060000 1
      en Albert_Caquot 20071211-080000 1
      en Albert_Caquot 20071211-150000 3
      en Albert_Caquot 20071211-210000 1"""
      
      import tempfile
      
      fp = tempfile.NamedTemporaryFile()
      fp.write(input_data)
      
      fp.seek(0)
      
      import findspark
      findspark.init()
      
      import pyspark
      from pyspark.sql.types import StructType, StructField
      from pyspark.sql.types import StringType, IntegerType, TimestampType
      from pyspark.sql import functions
      
      sc = pyspark.SparkContext(appName="udf_example")
      sqlctx = pyspark.SQLContext(sc)
      
      schema = StructType([StructField("lang", StringType(), False),
                           StructField("page", StringType(), False),
                           StructField("timestamp", TimestampType(), False),
                           StructField("views", IntegerType(), False)])
      
      df = sqlctx.read.csv(fp.name,
                           header=False,
                           schema=schema,
                           timestampFormat="yyyyMMdd-HHmmss",
                           sep=' ')
      
      df.count()
      df.dtypes
      df.show()
      
      new_schema = StructType([StructField("lang", StringType(), False),
                               StructField("page", StringType(), False),
                               StructField("day", StringType(), False),
                               StructField("enc", StringType(), False)])
      
      from pyspark.sql.functions import pandas_udf, PandasUDFType
      import pandas as pd
      
      hour_to_letter = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O',
                        'P','Q','R','S','T','U','V','W','X']
      
      @pandas_udf(new_schema, PandasUDFType.GROUPED_MAP)
      def concat_hours(x):
          view_hours = x['hour'].tolist()
          view_views = x['views'].tolist()
      
          view_hours_letters = [hour_to_letter[h] for h in view_hours]
      
          encoded_views = [l + str(h)
                           for l, h in sorted(zip(view_hours_letters,view_views))]
          encoded_views_string = ''.join(encoded_views)
      
          # return pd.DataFrame({'enc': x.page, 'day': x.lang, 'lang': x.day,
          #                      'page': encoded_views_string}, index=[x.index[0]])
      
          return pd.DataFrame({'page': x.page, 'lang': x.lang,'day': x.day,
                               'enc': encoded_views_string}, index=[x.index[0]])
      
      
      from pyspark.sql import functions
      grouped_df = (df.select(['lang',
                               'page',
                               functions.date_format('timestamp','yyyy-MM-dd')\
                                        .alias('day'), 
                               functions.hour('timestamp').alias('hour'), 
                               'views'
                               ])
                      .groupby(['lang','page','day'])
                      )
      
      grouped_df = (grouped_df.apply(concat_hours)
                              .dropDuplicates()
                              )
      
      grouped_df.show()
      

       
      This is what I am getting:

      $ ./udf_example.py
      2018-05-20 05:13:23 WARN  Utils:66 - Your hostname, inara resolves to a loopback address: 127.0.1.1; using 10.109.49.111 instead (on interface wlp2s0)
      2018-05-20 05:13:23 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
      2018-05-20 05:13:23 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      2018-05-20 05:13:24 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
      +----+-------------+-------------------+-----+
      |lang|         page|          timestamp|views|
      +----+-------------+-------------------+-----+
      |  en| Albert_Camus|2007-12-10 00:00:00|  150|
      |  en| Albert_Camus|2007-12-10 01:00:00|  148|
      |  en| Albert_Camus|2007-12-10 02:00:00|  197|
      |  en| Albert_Camus|2007-12-11 20:00:00|  145|
      |  en| Albert_Camus|2007-12-11 21:00:00|  131|
      |  en| Albert_Camus|2007-12-11 22:00:00|  154|
      |  en| Albert_Camus|2007-12-11 23:00:01|  142|
      |  en|Albert_Caquot|2007-12-10 02:00:00|    1|
      |  en|Albert_Caquot|2007-12-10 02:00:00|    1|
      |  en|Albert_Caquot|2007-12-10 04:00:01|    1|
      |  en|Albert_Caquot|2007-12-11 06:00:00|    1|
      |  en|Albert_Caquot|2007-12-11 08:00:00|    1|
      |  en|Albert_Caquot|2007-12-11 15:00:00|    3|
      |  en|Albert_Caquot|2007-12-11 21:00:00|    1|
      +----+-------------+-------------------+-----+
      
      +----------+----------------+---+-------------+                                 
      |      lang|            page|day|          enc|
      +----------+----------------+---+-------------+
      |2007-12-10|    A150B148C197| en| Albert_Camus|
      |2007-12-11|        G1I1P3V1| en|Albert_Caquot|
      |2007-12-10|          C1C1E1| en|Albert_Caquot|
      |2007-12-11|U145V131W154X142| en| Albert_Camus|
      +----------+----------------+---+-------------+
      

      Of course what I am expecting is:

      +----+-------------+----------+----------------+                                
      |lang|         page|       day|             enc|
      +----+-------------+----------+----------------+
      |  en|Albert_Caquot|2007-12-11|        G1I1P3V1|
      |  en|Albert_Caquot|2007-12-10|          C1C1E1|
      |  en| Albert_Camus|2007-12-10|    A150B148C197|
      |  en| Albert_Camus|2007-12-11|U145V131W154X142|
      +----+-------------+----------+----------------+
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                bryanc Bryan Cutler
                Reporter:
                CristianCantoro Cristian Consonni
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: