Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.4.0
-
None
-
None
Description
Steps to reproduce. Define schema like this
StructType valid = StructType.fromDDL( "broker_name string, order integer, server_name string, " + "storages array<struct<timestamp: timestamp, storage: double>>" );
package com.example; import java.io.Serializable; import lombok.Data; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class Entity implements Serializable { private String broker_name; private String server_name; private Integer order; private Storage[] storages; }
package com.example; import java.io.Serializable; import lombok.Data; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class Storage implements Serializable { private java.sql.Timestamp timestamp; private Double storage; }
Create a JSON file with the following content:
[ { "broker_name": "A1", "server_name": "S1", "order": 1, "storages": [ { "timestamp": "2018-10-29 23:11:44.000", "storage": 12.5 } ] } ]
Process data as
Dataset<Entity> ds = spark.read().option("multiline", "true").schema(valid).json("/path/to/file") .as(Encoders.bean(Entity.class)); ds .groupByKey((MapFunction<Entity, String>) o -> o.getBroker_name(), Encoders.STRING()) .reduceGroups((ReduceFunction<Entity>)(e1, e2) -> e1) .map((MapFunction<Tuple2<String, Entity>, Entity>) tuple -> tuple._2, Encoders.bean(Entity.class)) .show(10, false);
The result will be:
+-----------+-----+-----------+--------------------------------------------------------+ |broker_name|order|server_name|storages | +-----------+-----+-----------+--------------------------------------------------------+ |A1 |1 |S1 |[[7.612815958429577E-309, 148474-03-19 22:14:3232.5248]]| +-----------+-----+-----------+--------------------------------------------------------+