Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-11330

Filter operation on StringType after groupBy PERSISTED brings no results

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Duplicate
    • Affects Version/s: 1.5.1
    • Fix Version/s: 1.5.2, 1.6.0
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

      Description

      ONLY HAPPENS WHEN PERSIST() IS CALLED

      val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
      data.groupBy("vintages").count.select("vintages").filter("vintages = '2007-01-01'").first
      >>> res9: org.apache.spark.sql.Row = [2007-01-01]

      data.groupBy("vintages").count.persist.select("vintages").filter("vintages = '2007-01-01'").first
      >>> Exception on empty iterator stuff

      This does not happen if using another type of field, eg IntType
      data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 200805").first >>> res13: org.apache.spark.sql.Row = [200805]

      NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
      NOTE2: If setting the persist after the filtering, it works fine. But this is not a good enough workaround since any filter operation afterwards will break results.
      NOTE3: I have reproduced the issue with several different datasets.
      NOTE4: The only real workaround is to store the groupBy dataframe in database and reload it as a new dataframe.

      Query to raw-data works fine:

      data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: org.apache.spark.sql.Row = [2007-01-01]

      Originally, the issue happened with a larger aggregation operation, the result was that data was inconsistent bringing different results every call.

      Reducing the operation step by step, I got into this issue.
      In any case, the original operation was:

      val data = sqlContext.read.parquet("/var/Saif/data_pqt")

      val res = data.groupBy("product", "band", "age", "vint", "mb", "yyyymm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), sum($"payment").as("payment"), sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" === 1).as("newacct")).persist()

      val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect

      z.length

      >>> res0: Int = 102

      res.unpersist()

      val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect

      z.length

      >>> res1: Int = 103

        Attachments

        1. bug_reproduce.zip
          1 kB
          Saif Addin Ellafi
        2. bug_reproduce_50k.zip
          144 kB
          Saif Addin Ellafi

          Issue Links

            Activity

              People

              • Assignee:
                davies Davies Liu
                Reporter:
                saif.a.ellafi Saif Addin Ellafi
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: