Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24224

Table change to stream for CEP , only event  is ROW  datatype can works on CEP,  but  other POJOs、maps、 JsonObject datatype event  do not work.    Any datatype for event to CEP  is OK by  only stream api 

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.11.2, 1.12.0, 1.13.2
    • None
    • Library / CEP
    • None

    Description

       

      1、problem:Table change to stream for CEP , only event  is ROW  datatype can works on CEP,  but  othe*r POJOs、maps、 JsonObject datatype event  *do not work.

                             Any datatype for event to CEP  is OK by  only stream api 

      2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2

      3、code:

      (1)table to Stream  to  CEP   (only row datatype is ok,  other datatype Stream to CEP has no data print and it has no error message)

      tableEnv.executeSql(creat_kafka_source);
      tableEnv.executeSql(calculateSql);
      
      Table tb = tableEnv.from("calculateSql");
      String[] fieldNames = tb.getSchema().getFieldNames();
      DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
      
      KeyedStream<JSONObject, String> ds = tableEnv
              .toAppendStream(tb, Row.class)
              .map(new RichMapFunction<Row, JSONObject>() {
                  Map<String, Object> map = new HashMap<>();
      
                  @Override
                  public void open(Configuration parameters) throws Exception {
                      super.open(parameters);
                      if (null == map) {
                          map = new HashMap<>();
                      }
                  }
      
                  @Override
                  public JSONObject map(Row value) throws Exception {
                      //将数据key和value添加到map中
                      RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, fieldNames, value);
                      JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(map));
                      map.clear();
                      return jsonObject;
                  }
              })
              .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
                  @Override
                  public long extractTimestamp(JSONObject element) {
                      return element.getLongValue("wStart") * 1000;
                  }
              }).keyBy(x -> x.getString("x_forwarded_for"));
      //it has data to print
      ds.print();
      
      Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
              .where(new SimpleCondition<JSONObject>() {
                  @Override
                  public boolean filter(JSONObject value) throws Exception {
                      log.info("===================>" + value);
                      return true;
                  }
              }).timesOrMore(1).within(Time.seconds(10));
      
      PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
      //it has no data to print
      patternStream.process(new PatternProcessFunction<JSONObject, String>() {
          @Override
          public void processMatch(Map<String, List<JSONObject>> match, Context ctx, Collector<String> out) throws Exception {
              out.collect("==========>>>>>>>" + match.toString());
          }
      }).print();
      
      
      

      (2) Olny Stream API  to CEP ( Any datatype ,  it is OK)

      Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
      FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
              new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
      consumer.setStartFromEarliest();
      
      SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
              .map(x -> {
                  return JSON.parseObject(x.value());
              })
              .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
                  @Override
                  public long extractTimestamp(JSONObject element) {
                      return element.getLongValue("ts");
                  }
              })
              .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
              .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                  @Override
                  public void apply(String s, TimeWindow window, Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception {
                      Iterator<JSONObject> iterator = input.iterator();
                      ArrayList<JSONObject> list = new ArrayList<>();
                      int n = 0;
                      while (iterator.hasNext()) {
                          n++;
                          JSONObject next = iterator.next();
                          list.add(next);
                      }
                      JSONObject jsonObject = list.get(0);
                      jsonObject.put("ct",n);
                      jsonObject.remove("ts");
                      out.collect(jsonObject);
                  }
              });
      
      input.print();
      
      //it is ok
      Pattern<JSONObject, JSONObject> minInterval = Pattern
              .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
                  @Override
                  public boolean filter(JSONObject jsonObject) throws Exception {
                      return true;
                  }
              }).timesOrMore(1).within(Time.seconds(10));
      
      PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
      pattern.process(new PatternProcessFunction<JSONObject, String>() {
          @Override
          public void processMatch(Map<String, List<JSONObject>> map, Context context, Collector<String> out) throws Exception {
              out.collect("这个用户有嫌疑:====================>" + map.toString());
          }
      }).print();
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            YUJIANBO YUJIANBO
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: