Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.3.0, 3.4.0
Description
@Data @NoArgsConstructor @AllArgsConstructor public static class Item implements Serializable { private String x; private String y; private int z; public Item addZ(int z) { return new Item(x, y, this.z + z); } }
List<Item> items = List.of( new Item("X1", "Y1", 1), new Item("X2", "Y1", 1), new Item("X1", "Y1", 1), new Item("X2", "Y1", 1), new Item("X3", "Y1", 1), new Item("X1", "Y1", 1), new Item("X1", "Y2", 1), new Item("X2", "Y1", 1)); Dataset<Item> ds = spark.createDataFrame(items, Item.class).as(Encoders.bean(Item.class)); ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item -> Tuple2.apply(item.getX(), item.getY()), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .reduceGroups((ReduceFunction<Item>) (item1, item2) -> item1.addZ(item2.getZ())) .show(10);
result is
+--------+----------------------------------------------+ | key|ReduceAggregator(poc.job.JavaSparkReduce$Item)| +--------+----------------------------------------------+ |{X1, Y1}| {X2, Y1, 2}|-- expected 3 |{X2, Y1}| {X2, Y1, 2}|-- expected 3 |{X1, Y2}| {X2, Y1, 1}| |{X3, Y1}| {X2, Y1, 1}| +--------+----------------------------------------------+
pay attention that key doesn't mach with value