Uploaded image for project: 'Bahir (Retired)'
  1. Bahir (Retired)
  2. BAHIR-190

ActiveMQ connector stops on empty queue

    XMLWordPrintableJSON

Details

    Description

      I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it seems to connector exits once there are no more messages in the queue. This ends the Flink job processing the stream.

      To me it seems, that the while loop inside the run method (AMQSource.java, line 222) should not do a return, but a continue if the message is no instance of ByteMessage, e.g. null.

      If I'm right, I can create a pull request showing the change.

      To reproduce:

       

      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("xxx", "xxx", "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
      
      AMQSourceConfig<String> amqConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
              .setConnectionFactory(connectionFactory)
              .setDestinationName("test")
              .setDestinationType(DestinationType.QUEUE)
              .setDeserializationSchema(new SimpleStringSchema())
              .build();
      AMQSource<String> amqSource = new AMQSource<>(amqConfig);
      
      env.addSource(amqSource).print()
      env.setParallelism(1).execute("ActiveMQ Consumer");

      Then point the Flink job at an empty ActiveMQ queue.

       

      Not sure if this is a bug, but it's not what I expected when I used the connector.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sbrosinski Stephan Brosinski
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: