Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Not A Problem
-
1.12.0
-
None
-
None
-
flink1.12.0
jdk1.8
Description
The cep code is running properly on flink1.11.2,but it is not working properly on flink1.12.0.
Can somebody help me?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream : source
DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1,"Device01", 22.0),
new TemperatureEvent(1,"Device01", 27.1), new TemperatureEvent(2,"Device01", 28.1),
new TemperatureEvent(1,"Device01", 22.2), new TemperatureEvent(3,"Device01", 22.1),
new TemperatureEvent(1,"Device02", 22.3), new TemperatureEvent(4,"Device02", 22.1),
new TemperatureEvent(1,"Device02", 22.4), new TemperatureEvent(5,"Device02", 22.7),
new TemperatureEvent(1,"Device02", 27.0), new TemperatureEvent(6,"Device02", 30.0));
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
.subtype(TemperatureEvent.class)
.where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getTemperature() >= 26.0)
return false;
}
}).where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getMachineName().equals("Device02")) { return true; }
return false;
}
}).within(Time.seconds(10));
DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
.select(
new RichPatternSelectFunction<TemperatureEvent, Alert>() {
/**
*/
{ System.out.println(getRuntimeContext().getUserCodeClassLoader()); }
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration parameters) throws Exception
@Override
public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
it should be output(on flink1.11.2):
Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]
Attachments
Issue Links
- relates to
-
FLINK-19326 Allow explicitly configuring time behaviour on CEP PatternStream
- Closed