Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16390 Dataset API improvements
  3. SPARK-16607

Aggregator with null initialisation will result in null

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • None
    • None
    • SQL
    • spark 2.0-branch

    Description

      Java code example:

          SparkSession session = SparkSession.builder()
                                             .appName("TestAggregatorJava")
                                             .master("local[*]")
                                             .getOrCreate();
          Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
                  new Tuple2<>("a", 1),
                  new Tuple2<>("a", 2),
                  new Tuple2<>("a", 3)
          ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
          Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
              new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
      
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception {
              if (value._2() > 1) {
                return value;
              } else {
                return new Tuple2<>(value._1, null);
              }
            }
          }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
          Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
              new MapFunction<Tuple2<String,Integer>, String>() {
      
            @Override
            public String call(Tuple2<String, Integer> value) throws Exception {
              return value._1();
            }
          }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() {
            @Override
            public Integer zero() {
              return null;
            }
      
            @Override
            public Integer reduce(Integer b, Tuple2<String, Integer> a) {
              return merge(b, a._2());
            }
      
            @Override
            public Integer merge(Integer b1, Integer b2) {
              if (b1 == null) {
                return b2;
              } else if (b2 == null){
                return b1;
              } else {
                return b1 + b2;
              }
            }
      
            @Override
            public Integer finish(Integer reduction) {
              return reduction;
            }
      
            @Override
            public Encoder<Integer> bufferEncoder() {
              return Encoders.INT();
            }
      
            @Override
            public Encoder<Integer> outputEncoder() {
              return Encoders.INT();
            }
          }.toColumn());
      
          ds3.printSchema();
          ds3.show();
        }
      

      I get this schema:

      root
       |-- value: string (nullable = true)
       |-- (scala.Tuple2): integer (nullable = true)
      

      And this result:

      +-----+--------------+
      |value|(scala.Tuple2)|
      +-----+--------------+
      |    a|          null|
      +-----+--------------+
      

      The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result.

      Attachments

        Activity

          People

            Unassigned Unassigned
            amitsela Amit Sela
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: