Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.7.2
-
None
-
Working in my local IDE(Eclipse).
Description
Hi Team,
Below is my the code the the execution environment to run the Apache Flink job that's consume message from ActiveMQ topic::
StreamExecutionEnvironment env = createExecutionEnvironment();
connectionFactory = new ActiveMQConnectionFactory("*****", "*****.",
"failover:(tcp://amq-master-01:61668)?timeout=3000");
LOG.info("exceptionListener----{}", new AMQExceptionListLocal(LOG, true));
RunningChecker runningChecker = new RunningChecker();
runningChecker.setIsRunning(true);
AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(connectionFactory).setDestinationName("test_flink")
.setDeserializationSchema(deserializationSchema).setRunningChecker(runningChecker)
.setDestinationType(DestinationType.TOPIC).build();
amqSource = new AMQSourceLocal<>(config);
LOG.info("Check whether ctx is null ::;;;;{}", amqSource);
DataStream<String> dataMessage = env.addSource(amqSource);
dataMessage.writeAsText("C:/Users/shivkumar/Desktop/flinksjar/output.txt", WriteMode.OVERWRITE);
System.out.println("Step 1");
env.execute("Check ACTIVE_MQ");
When we are starting the job, Topic is getting created and message is getting dequeued from that topic.
But After that is getting finished. What Can be done to keep the job running?