Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20814

The CEP code is not running properly

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Not A Problem
    • Affects Version/s: 1.12.0
    • Fix Version/s: None
    • Component/s: Library / CEP
    • Labels:
      None
    • Environment:

      flink1.12.0
      jdk1.8

      Description

      The cep code is running properly on flink1.11.2,but it is not working properly on flink1.12.0.
      Can somebody help me?

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // DataStream : source
      DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1,"Device01", 22.0),
      new TemperatureEvent(1,"Device01", 27.1), new TemperatureEvent(2,"Device01", 28.1),
      new TemperatureEvent(1,"Device01", 22.2), new TemperatureEvent(3,"Device01", 22.1),
      new TemperatureEvent(1,"Device02", 22.3), new TemperatureEvent(4,"Device02", 22.1),
      new TemperatureEvent(1,"Device02", 22.4), new TemperatureEvent(5,"Device02", 22.7),
      new TemperatureEvent(1,"Device02", 27.0), new TemperatureEvent(6,"Device02", 30.0));

      Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
      .subtype(TemperatureEvent.class)
      .where(new SimpleCondition<TemperatureEvent>() {
      @Override
      public boolean filter(TemperatureEvent subEvent) {
      if (subEvent.getTemperature() >= 26.0)

      { return true; }
      return false;
      }
      }).where(new SimpleCondition<TemperatureEvent>() {
      @Override
      public boolean filter(TemperatureEvent subEvent) {
      if (subEvent.getMachineName().equals("Device02")) { return true; }

      return false;
      }
      }).within(Time.seconds(10));

      DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
      .select(
      new RichPatternSelectFunction<TemperatureEvent, Alert>() {
      /**

      • */
        private static final long serialVersionUID = 1L;
        @Override
        public void open(Configuration parameters) throws Exception

        { System.out.println(getRuntimeContext().getUserCodeClassLoader()); }

      @Override
      public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception

      { return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start")); }

      });

      patternStream.print();

      env.execute("CEP on Temperature Sensor");

      it should be output(on flink1.11.2):
      Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
      Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                lprince little-tomato
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: