Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-4409

NoSuchMethodException reading from JmsIO

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.4.0
    • 2.20.0
    • io-java-jms
    • None
    • Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ

    Description

      Running with the DirectRunner, and reading from a queue with JmsIO as an unbounded source will produce a NoSuchMethodException. This occurs as the UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the JmsCheckpointMark with the default (Avro) coder.

      The following trivial code on the reader side reproduces the error (DirectRunner must be in path). The messages on the queue for this test case were simple TextMessages. I found this exception is triggered more readily when messages are published rapidly (~200/second)

      Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
      
      // read from the queue
      ConnectionFactory factory = new
      ActiveMQConnectionFactory("tcp://localhost:61616");
      
      PCollection<String> inputStrings = p.apply("Read from queue",
      JmsIO.<String>readMessage() .withConnectionFactory(factory)
      .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
      .withMessageMapper((JmsIO.MessageMapper<String>) message ->
      ((TextMessage) message).getText()));
      
      // decode 
      PCollection<String> asStrings = inputStrings.apply("Decode Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
      void processElement(ProcessContext context) {
      System.out.println(context.element());
      context.output(context.element()); } })); 
      p.run();
      

      Stack trace:

      Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
      

       

      And a more contrived example of how to produce the exception:

      package org.apache.beam.sdk.io.jms; 
      import org.apache.activemq.command.ActiveMQTextMessage; 
      import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.CoderUtils; 
      final class CoderErrorExample { public static void main(String[] args) throws Exception { 
        Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
        JmsCheckpointMark checkpointMark = new JmsCheckpointMark(); 
        checkpointMark.addMessage(new ActiveMQTextMessage());
        CoderUtils.clone(coder, checkpointMark); // from org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader
      } 
      }
      

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gruntled Edward Pricer
              Votes:
              2 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: