Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29009

Returning pojo from udf not working

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.4.3
    • None
    • SQL
    • None

    Description

       It looks like spark is unable to construct row from pojo returned from udf.

      Give POJO:

      public class SegmentStub {
          private int id;
          private Date statusDateTime;
          private int healthPointRatio;
      }
      

      Registration of the UDF:

      public class ParseResultsUdf {
      
          public String registerUdf(SparkSession sparkSession) {
              Encoder<SegmentStub> encoder = Encoders.bean(SegmentStub.class);
              final StructType schema = encoder.schema();
              sparkSession.udf().register(UDF_NAME,
                      (UDF2<String, String, SegmentStub>) (s, s2) -> new SegmentStub(1, Date.valueOf(LocalDate.now()), 2),
                      schema
              );
              return UDF_NAME;
          }
      }
      

      Test code:

              List<String[]> strings = Arrays.asList(new String[]{"one", "two"},new String[]{"3", "4"});
              JavaRDD<Row> rowJavaRDD = sparkContext.parallelize(strings).map(RowFactory::create);
      
              StructType schema = DataTypes
                      .createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false),
                              DataTypes.createStructField("foe2", DataTypes.StringType, false) });
      
      
              Dataset<Row> dataFrame = sparkSession.sqlContext().createDataFrame(rowJavaRDD, schema);
              Seq<Column> columnSeq = new Set.Set2<>(col("foe1"), col("foe2")).toSeq();
              dataFrame.select(callUDF(udfName, columnSeq)).show();
      

       throws exception:

      Caused by: java.lang.IllegalArgumentException: The value (SegmentStub(id=1, statusDateTime=2019-09-06, healthPointRatio=2)) of the type (udf.SegmentStub) cannot be converted to struct<healthPointRatio:int,id:int,statusDateTime:date>
      	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
      	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
      	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
      	at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
      	... 21 more
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            tomasz.belina Tomasz Belina
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: