Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32580

Issue accessing a column values after 'explode' function

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.1
    • SQL
    • None

    Description

      An exception occurs when trying to flatten double nested arrays

      The schema is

      root
       |-- data: array (nullable = true)
       |    |-- element: struct (containsNull = true)
       |    |    |-- item_id: string (nullable = true)
       |    |    |-- timestamp: string (nullable = true)
       |    |    |-- values: array (nullable = true)
       |    |    |    |-- element: struct (containsNull = true)
       |    |    |    |    |-- sample: double (nullable = true)
      

      The target schema is

      root
       |-- item_id: string (nullable = true)
       |-- timestamp: string (nullable = true)
       |-- sample: double (nullable = true)
      

      The code (in Java)

      package com.skf.streamer.spark;
      
      import java.util.concurrent.TimeoutException;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.functions;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      public class ExplodeTest {
      
         public static void main(String[] args) throws TimeoutException {
      
            SparkConf conf = new SparkConf()
               .setAppName("SimpleApp")
               .set("spark.scheduler.mode", "FAIR")
               .set("spark.master", "local[1]")
               .set("spark.sql.streaming.checkpointLocation", "checkpoint");
      
            SparkSession spark = SparkSession.builder()
               .config(conf)
               .getOrCreate();
      
            Dataset<Row> d0 = spark
               .read()
               .format("json")
               .option("multiLine", "true")
               .schema(getSchema())
               .load("src/test/resources/explode/data.json");
      
            d0.printSchema();
      
            d0 = d0.withColumn("item", functions.explode(d0.col("data")));
            d0 = d0.withColumn("value", functions.explode(d0.col("item.values")));
            d0.printSchema();
            d0 = d0.select(
               d0.col("item.item_id"),
               d0.col("item.timestamp"),
               d0.col("value.sample")
            );
      
            d0.printSchema();
      
            d0.show(); // Failes
      
            spark.stop();
         }
      
         private static StructType getSchema() {
            StructField[] level2Fields = {
               DataTypes.createStructField("sample", DataTypes.DoubleType, false),
            };
      
            StructField[] level1Fields = {
               DataTypes.createStructField("item_id", DataTypes.StringType, false),
               DataTypes.createStructField("timestamp", DataTypes.StringType, false),
               DataTypes.createStructField("values", DataTypes.createArrayType(DataTypes.createStructType(level2Fields)), false)
            };
      
            StructField[] fields = {
               DataTypes.createStructField("data", DataTypes.createArrayType(DataTypes.createStructType(level1Fields)), false)
            };
      
            return DataTypes.createStructType(fields);
         }
      }
      

      The data file

      {
        "data": [
          {
            "item_id": "item_1",
            "timestamp": "2020-07-01 12:34:89",
            "values": [
              {
                "sample": 1.1
              },
              {
                "sample": 1.2
              }
            ]
          },
          {
            "item_id": "item_2",
            "timestamp": "2020-07-02 12:34:89",
            "values": [
              {
                "sample": 2.2
              }
            ]
          }
        ]
      }
      

      Dataset.show() method fails with an exception

      Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_30#30 in [_gen_alias_28#28,_gen_alias_29#29]
      	at scala.sys.package$.error(package.scala:30)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
      	... 37 more
      

      Attachments

        1. data.json
          0.4 kB
          Ayrat Sadreev
        2. ExplodeTest.java
          2 kB
          Ayrat Sadreev

        Activity

          People

            Unassigned Unassigned
            airatsa Ayrat Sadreev
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: