Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
2.4.3
-
None
-
None
Description
It looks like spark is unable to construct row from pojo returned from udf.
Give POJO:
public class SegmentStub { private int id; private Date statusDateTime; private int healthPointRatio; }
Registration of the UDF:
public class ParseResultsUdf { public String registerUdf(SparkSession sparkSession) { Encoder<SegmentStub> encoder = Encoders.bean(SegmentStub.class); final StructType schema = encoder.schema(); sparkSession.udf().register(UDF_NAME, (UDF2<String, String, SegmentStub>) (s, s2) -> new SegmentStub(1, Date.valueOf(LocalDate.now()), 2), schema ); return UDF_NAME; } }
Test code:
List<String[]> strings = Arrays.asList(new String[]{"one", "two"},new String[]{"3", "4"}); JavaRDD<Row> rowJavaRDD = sparkContext.parallelize(strings).map(RowFactory::create); StructType schema = DataTypes .createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false), DataTypes.createStructField("foe2", DataTypes.StringType, false) }); Dataset<Row> dataFrame = sparkSession.sqlContext().createDataFrame(rowJavaRDD, schema); Seq<Column> columnSeq = new Set.Set2<>(col("foe1"), col("foe2")).toSeq(); dataFrame.select(callUDF(udfName, columnSeq)).show();
throws exception:
Caused by: java.lang.IllegalArgumentException: The value (SegmentStub(id=1, statusDateTime=2019-09-06, healthPointRatio=2)) of the type (udf.SegmentStub) cannot be converted to struct<healthPointRatio:int,id:int,statusDateTime:date> at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) ... 21 more }