Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.11.2, 1.12.0, 1.13.2
-
None
-
None
Description
1、problem:Table change to stream for CEP , only event is ROW datatype can works on CEP, but othe*r POJOs、maps、 JsonObject datatype event *do not work.
Any datatype for event to CEP is OK by only stream api
2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
3、code:
(1)table to Stream to CEP (only row datatype is ok, other datatype Stream to CEP has no data print and it has no error message)
tableEnv.executeSql(creat_kafka_source); tableEnv.executeSql(calculateSql); Table tb = tableEnv.from("calculateSql"); String[] fieldNames = tb.getSchema().getFieldNames(); DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes(); KeyedStream<JSONObject, String> ds = tableEnv .toAppendStream(tb, Row.class) .map(new RichMapFunction<Row, JSONObject>() { Map<String, Object> map = new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); if (null == map) { map = new HashMap<>(); } } @Override public JSONObject map(Row value) throws Exception { //将数据key和value添加到map中 RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, fieldNames, value); JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(map)); map.clear(); return jsonObject; } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("wStart") * 1000; } }).keyBy(x -> x.getString("x_forwarded_for")); //it has data to print ds.print(); Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin") .where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { log.info("===================>" + value); return true; } }).timesOrMore(1).within(Time.seconds(10)); PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern); //it has no data to print patternStream.process(new PatternProcessFunction<JSONObject, String>() { @Override public void processMatch(Map<String, List<JSONObject>> match, Context ctx, Collector<String> out) throws Exception { out.collect("==========>>>>>>>" + match.toString()); } }).print();
(2) Olny Stream API to CEP ( Any datatype , it is OK)
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid); FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs); consumer.setStartFromEarliest(); SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer) .map(x -> { return JSON.parseObject(x.value()); }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("ts"); } }) .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri")) .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, JSONObject, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception { Iterator<JSONObject> iterator = input.iterator(); ArrayList<JSONObject> list = new ArrayList<>(); int n = 0; while (iterator.hasNext()) { n++; JSONObject next = iterator.next(); list.add(next); } JSONObject jsonObject = list.get(0); jsonObject.put("ct",n); jsonObject.remove("ts"); out.collect(jsonObject); } }); input.print(); //it is ok Pattern<JSONObject, JSONObject> minInterval = Pattern .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject jsonObject) throws Exception { return true; } }).timesOrMore(1).within(Time.seconds(10)); PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval); pattern.process(new PatternProcessFunction<JSONObject, String>() { @Override public void processMatch(Map<String, List<JSONObject>> map, Context context, Collector<String> out) throws Exception { out.collect("这个用户有嫌疑:====================>" + map.toString()); } }).print();