Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
3.4.2, 3.5.0, 4.0.0
-
None
Description
Since version 3.4, I've been experiencing the following error when using encoders.
Exception in thread "main" java.util.NoSuchElementException: key not found: T
at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
at org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
at org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
at org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
at org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
at org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
at org.example.Main.main(Main.java:26)
I'm attaching the code I use to reproduce the error locally. spark_test.zip
The issue is in the JavaTypeInference class when it tries to find the encoder for a ParameterizedType with the value Team<T>. When running JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T again, but this time as a Company object, and pt.getRawType as Team. This ends up generating a tuple of Team, Company in the typeVariables map, leading to errors when searching for TypeVariables.
My example code is this:
public class Main { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .master("local[*]") .appName("Spark Test") .getOrCreate(); Dataset<PersonData> df = spark.read() .option("header", "true") .option("delimiter", ",") .csv("src/main/resources/data/person-data.csv") .as(Encoders.bean(PersonData.class)); Dataset<CompanyWrapper> companyWrapperDataset = df.map((MapFunction<PersonData, CompanyWrapper>) personData -> { Team<PersonData> team = new Team<>("TEAM NAME", personData); return new CompanyWrapper("COMPANY NAME", team); }, Encoders.bean(CompanyWrapper.class)); companyWrapperDataset.show(false); } }
public class CompanyWrapper extends Company<PersonData> { public CompanyWrapper() { } public CompanyWrapper(String name, Team<PersonData> team) { super(name, team); } }
public class Company<T> { private String name; private Team<T> team; public Company() { } public Company(String name, Team<T> team) { this.name = name; this.team = team; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Team<T> getTeam() { return team; } public void setTeam(Team<T> team) { this.team = team; } @Override public String toString() { return "Company{" + "name='" + name + '\'' + ", team=" + team + '}'; } }
public class Team<T> { public String name; public T person; public Team() { } public Team(String name, T person) { this.name = name; this.person = person; } public String getName() { return name; } public void setName(String name) { this.name = name; } public T getPerson() { return person; } public void setPerson(T person) { this.person = person; } @Override public String toString() { return "Team{" + "name='" + name + '\'' + ", person=" + person + '}'; } }
public class PersonData implements Serializable { private String id; private String firstName; private String lastName; private String email; private String phone; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } @Override public String toString() { return "PersonData{" + "id='" + id + '\'' + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + ", email='" + email + '\'' + ", phone='" + phone + '\'' + '}'; } }
Attachments
Attachments
Issue Links
- links to