Description
An exception occurs when trying to flatten double nested arrays
The schema is
root |-- data: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- item_id: string (nullable = true) | | |-- timestamp: string (nullable = true) | | |-- values: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- sample: double (nullable = true)
The target schema is
root |-- item_id: string (nullable = true) |-- timestamp: string (nullable = true) |-- sample: double (nullable = true)
The code (in Java)
package com.skf.streamer.spark; import java.util.concurrent.TimeoutException; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ExplodeTest { public static void main(String[] args) throws TimeoutException { SparkConf conf = new SparkConf() .setAppName("SimpleApp") .set("spark.scheduler.mode", "FAIR") .set("spark.master", "local[1]") .set("spark.sql.streaming.checkpointLocation", "checkpoint"); SparkSession spark = SparkSession.builder() .config(conf) .getOrCreate(); Dataset<Row> d0 = spark .read() .format("json") .option("multiLine", "true") .schema(getSchema()) .load("src/test/resources/explode/data.json"); d0.printSchema(); d0 = d0.withColumn("item", functions.explode(d0.col("data"))); d0 = d0.withColumn("value", functions.explode(d0.col("item.values"))); d0.printSchema(); d0 = d0.select( d0.col("item.item_id"), d0.col("item.timestamp"), d0.col("value.sample") ); d0.printSchema(); d0.show(); // Failes spark.stop(); } private static StructType getSchema() { StructField[] level2Fields = { DataTypes.createStructField("sample", DataTypes.DoubleType, false), }; StructField[] level1Fields = { DataTypes.createStructField("item_id", DataTypes.StringType, false), DataTypes.createStructField("timestamp", DataTypes.StringType, false), DataTypes.createStructField("values", DataTypes.createArrayType(DataTypes.createStructType(level2Fields)), false) }; StructField[] fields = { DataTypes.createStructField("data", DataTypes.createArrayType(DataTypes.createStructType(level1Fields)), false) }; return DataTypes.createStructType(fields); } }
The data file
{ "data": [ { "item_id": "item_1", "timestamp": "2020-07-01 12:34:89", "values": [ { "sample": 1.1 }, { "sample": 1.2 } ] }, { "item_id": "item_2", "timestamp": "2020-07-02 12:34:89", "values": [ { "sample": 2.2 } ] } ] }
Dataset.show() method fails with an exception
Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_30#30 in [_gen_alias_28#28,_gen_alias_29#29] at scala.sys.package$.error(package.scala:30) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 37 more