Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27050

Bean Encoder serializes data in a wrong order if input schema is not ordered

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.4.0
    • None
    • SQL
    • None

    Description

      Steps to reproduce. Define schema like this

       

      
      StructType valid = StructType.fromDDL(
        "broker_name string, order integer, server_name string, " + 
        "storages array<struct<timestamp: timestamp, storage: double>>" 
      );
      package com.example;
      
      import java.io.Serializable;
      import lombok.Data;
      import lombok.AllArgsConstructor;
      import lombok.NoArgsConstructor;
      
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public class Entity implements Serializable {
          private String broker_name;
          private String server_name;
          private Integer order;
          private Storage[] storages;
      }
      package com.example;
      
      import java.io.Serializable;
      import lombok.Data;
      import lombok.AllArgsConstructor;
      import lombok.NoArgsConstructor;
      
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public class Storage implements Serializable {
          private java.sql.Timestamp timestamp;
          private Double storage;
      }

      Create a JSON file with the following content:

      [
        {
          "broker_name": "A1",
          "server_name": "S1",
          "order": 1,
          "storages": [
            {
              "timestamp": "2018-10-29 23:11:44.000",
              "storage": 12.5
            }
          ]
        }
      ]

       

      Process data as

      Dataset<Entity> ds = spark.read().option("multiline", "true").schema(valid).json("/path/to/file")
              .as(Encoders.bean(Entity.class));
      
          ds
              .groupByKey((MapFunction<Entity, String>) o -> o.getBroker_name(), Encoders.STRING())
              .reduceGroups((ReduceFunction<Entity>)(e1, e2) -> e1)
              .map((MapFunction<Tuple2<String, Entity>, Entity>) tuple -> tuple._2, Encoders.bean(Entity.class))
              .show(10, false);

      The result will be:

      +-----------+-----+-----------+--------------------------------------------------------+
      |broker_name|order|server_name|storages                                                |
      +-----------+-----+-----------+--------------------------------------------------------+
      |A1         |1    |S1         |[[7.612815958429577E-309, 148474-03-19 22:14:3232.5248]]|
      +-----------+-----+-----------+--------------------------------------------------------+
      

      Source https://stackoverflow.com/q/54987724

      Attachments

        Activity

          People

            Unassigned Unassigned
            hejsgpuom62c hejsgpuom62c
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: