Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24051

Incorrect results for certain queries using Java and Python APIs on Spark 2.3.0

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0
    • None
    • SQL

    Description

      I'm seeing Spark 2.3.0 return incorrect results for a certain (very specific) query, demonstrated by the Java program below. It was simplified from a much more complex query, but I'm having trouble simplifying it further without removing the erroneous behaviour.

      package sparktest;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.sql.*;
      import org.apache.spark.sql.expressions.Window;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.Metadata;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      import java.util.Arrays;
      
      public class Main {
      
          public static void main(String[] args) {
              SparkConf conf = new SparkConf()
                      .setAppName("SparkTest")
                      .setMaster("local[*]");
              SparkSession session = SparkSession.builder().config(conf).getOrCreate();
      
              Row[] arr1 = new Row[]{
                      RowFactory.create(1, 42),
                      RowFactory.create(2, 99)};
              StructType sch1 = new StructType(new StructField[]{
                      new StructField("a", DataTypes.IntegerType, true, Metadata.empty()),
                      new StructField("b", DataTypes.IntegerType, true, Metadata.empty())});
              Dataset<Row> ds1 = session.createDataFrame(Arrays.asList(arr1), sch1);
              ds1.show();
      
              Row[] arr2 = new Row[]{
                      RowFactory.create(3)};
              StructType sch2 = new StructType(new StructField[]{
                      new StructField("a", DataTypes.IntegerType, true, Metadata.empty())});
              Dataset<Row> ds2 = session.createDataFrame(Arrays.asList(arr2), sch2)
                      .withColumn("b", functions.lit(0));
              ds2.show();
      
              Column[] cols = new Column[]{
                      new Column("a"),
                      new Column("b").as("b"),
                      functions.count(functions.lit(1))
                              .over(Window.partitionBy())
                              .as("n")};
              Dataset<Row> ds = ds1
                      .select(cols)
                      .union(ds2.select(cols))
                      .where(new Column("n").geq(1))
                      .drop("n");
              ds.show();
              //ds.explain(true);
          }
      }
      

      It just calculates the union of 2 datasets,

      +---+---+
      |  a|  b|
      +---+---+
      |  1| 42|
      |  2| 99|
      +---+---+
      

      with

      +---+---+
      |  a|  b|
      +---+---+
      |  3|  0|
      +---+---+
      

      The expected result is:

      +---+---+
      |  a|  b|
      +---+---+
      |  1| 42|
      |  2| 99|
      |  3|  0|
      +---+---+
      

      but instead it prints:

      +---+---+
      |  a|  b|
      +---+---+
      |  1|  0|
      |  2|  0|
      |  3|  0|
      +---+---+
      

      notice how the value in column c is always zero, overriding the original values in rows 1 and 2.
      Making seemingly trivial changes, such as replacing new Column("b").as("b"), with just new Column("b"), or removing the where clause after the union, make it behave correctly again.

      Attachments

        Activity

          People

            Unassigned Unassigned
            emlyn Emlyn Corrin
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: