Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12093

Apache Flink:Active MQ consumer job is getting finished after first message consume.

    XMLWordPrintableJSON

Details

    Description

      Hi Team,

       

      Below is my the code the the execution environment to run the Apache Flink job that's consume message from ActiveMQ topic::

       

      StreamExecutionEnvironment env = createExecutionEnvironment();

      connectionFactory = new ActiveMQConnectionFactory("*****", "*****.",
      "failover:(tcp://amq-master-01:61668)?timeout=3000");

      LOG.info("exceptionListener----{}", new AMQExceptionListLocal(LOG, true));

      RunningChecker runningChecker = new RunningChecker();
      runningChecker.setIsRunning(true);

      AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
      .setConnectionFactory(connectionFactory).setDestinationName("test_flink")
      .setDeserializationSchema(deserializationSchema).setRunningChecker(runningChecker)
      .setDestinationType(DestinationType.TOPIC).build();

      amqSource = new AMQSourceLocal<>(config);

      LOG.info("Check whether ctx is null ::;;;;{}", amqSource);

      DataStream<String> dataMessage = env.addSource(amqSource);

      dataMessage.writeAsText("C:/Users/shivkumar/Desktop/flinksjar/output.txt", WriteMode.OVERWRITE);
      System.out.println("Step 1");

      env.execute("Check ACTIVE_MQ");

       

      When we are starting the job, Topic is getting created and message is getting dequeued from that topic.

      But After that is getting finished. What Can be done to keep the job running?

      Attachments

        Activity

          People

            Unassigned Unassigned
            shivk4464 shiv kumar
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: