Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3739

@Parameter annotation does not work for UDFs in Beam SQL

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.3.0
    • 2.4.0
    • dsl-sql
    • None

    Description

      BeamSqlUdf javadoc indicates you can have optional parameters, but this functionality is not working. I implemented the following copy/paste from the doc https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.html:

      public static class MyLeftFunction implements BeamSqlUdf {
       public String eval(
       @Parameter(name = "s") String s,
       @Parameter(name = "n", optional = true) Integer n) {
       return s.substring(0, n == null ? 1 : n);
       }
      }

      I modify a query in BeamSqlExample.java to use it. With all parameters supplied, it completes successfully:

      //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
      PCollection<Row> outputStream = inputTable.apply(
          BeamSql.query("select c1, leftfn('string1', 1) as c2, c3 from PCOLLECTION where c1 > 1")
              .registerUdf("leftfn", MyLeftFunction.class));

      With the optional parameter left off, I get an exception:

      //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
      PCollection<Row> outputStream = inputTable.apply(
       BeamSql.query("select c1, leftfn('string1') as c2, c3 from PCOLLECTION where c1 > 1")
       .registerUdf("leftfn", MyLeftFunction.class));
      Exception in thread "main" java.lang.IllegalStateException: java.lang.UnsupportedOperationException: Operator: DEFAULT is not supported yet!
       at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:75)
       at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:47)
       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
       at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
       at org.apache.beam.sdk.extensions.sql.example.BeamSqlExample.main(BeamSqlExample.java:76)
      Caused by: java.lang.UnsupportedOperationException: Operator: DEFAULT is not supported yet!
       at org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor.buildExpression(BeamSqlFnExecutor.java:424)
       at org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor.buildExpression(BeamSqlFnExecutor.java:201)
       at org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor.<init>(BeamSqlFnExecutor.java:125)
       at org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel.buildBeamPipeline(BeamProjectRel.java:70)
       at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:73)
       ... 5 more

      Attachments

        Issue Links

          Activity

            People

              mingmxu Mingmin Xu
              samwagg Samuel Waggoner
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m