Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9267

A BeamSQL UDF that returns a Map fails always with NullPointerException.

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • 2.19.0
    • Missing
    • dsl-sql
    • None

    Description

      When I create a UDF that returns a Map<String, String> and call it from within a SQL statement it consistently fails with a NullPointerException

      My UDF

      public class FooMap implements SerializableFunction<String, Map<String, String>> {
          @Override
          public Map<String, String> apply(String input) {
              final HashMap<String, String> hashMap = new HashMap<>();
              hashMap.put("Some", "Thing");
              return hashMap;
          }
      }
      

      and

      public class BarString implements SerializableFunction<String, String> {
          @Override
          public String apply(String input) {
              return new StringBuilder(input).reverse().toString();
          }
      }
      

      My test

      @Category(ValidatesRunner.class)
      public class TestFunctionReturnsMap implements Serializable {
      
          private static final Logger LOG = LoggerFactory.getLogger(TestFunctionReturnsMap.class);
      
          @Rule
          public final transient TestPipeline pipeline = TestPipeline.create();
      
          @Test
          @Category(NeedsRunner.class)
          public void testUserAgentAnalysisSQL() {
      
              // ============================================================
              // Create input PCollection<Row>
              Schema inputSchema = Schema
                  .builder()
                  .addStringField("bar")
                  .build();
      
              PCollection<Row> input = pipeline
                  .apply(Create.of(Arrays.asList("One", "Two", "Three")))
                  .setCoder(StringUtf8Coder.of())
                  .apply(ParDo.of(new DoFn<String, Row>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                          c.output(Row
                              .withSchema(inputSchema)
                              .addValues(c.element())
                              .build());
                      }
                  })).setRowSchema(inputSchema);
      
      
              // ============================================================
      
              PCollection<Row> result =
                  // This way we give a name to the input stream for use in the SQL
                  PCollectionTuple.of("InputStream", input)
                      // Apply the SQL with the UDFs we need.
                      .apply("Execute SQL", SqlTransform
                          .query(
                              "SELECT" +
                              "   bar             AS bar" +
                              "  ,Bar(bar)        AS barbar " +
                              "  ,Foo(bar)        AS foobar " +
                              "FROM InputStream")
                          .registerUdf("Foo",     new FooMap())
                          .registerUdf("Bar",     new BarString())
                      );
      
              result.apply(ParDo.of(new RowPrinter()));
      
              pipeline.run().waitUntilFinish();
          }
      
          public static class RowPrinter extends DoFn<Row, Row> {
              @ProcessElement
              public void processElement(ProcessContext c) {
                  final Row row = c.element();
                  LOG.info("ROW: {} --> {}", row, row.getSchema());
              }
          }
      }
      

      The Exception I always get

      java.lang.NullPointerException: Null type
      
      	at org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
      	at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
      	at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
      	at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
      	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
      	at java.util.Iterator.forEachRemaining(Iterator.java:116)
      	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
      	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
      	at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
      	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
      	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
      	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
      	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
      	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
      	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
      	at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
      	at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
      	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
      	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
      	at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
      	at nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      

       

      Attachments

        Issue Links

          Activity

            People

              nielsbasjes Niels Basjes
              nielsbasjes Niels Basjes
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: