Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.10.0
Description
For the following job:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings, CsvTableSink from pyflink.table.descriptors import Schema, Kafka, Json from pyflink.table import DataTypes from pyflink.table.udf import ScalarFunction, udf import os @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) def get_host_ip(source, qr, sip, dip): if source == "NGAF" and qr == '1': return dip return sip @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) def get_dns_server_ip(source, qr, sip, dip): if source == "NGAF" and qr == '1': return sip return dip def test_case(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) from pyflink.table import Row table = t_env.from_elements( [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", qr="qr", queries="queries", answers="answers", qtypes="qtypes", atypes="atypes", rcode="rcode", ts="ts",))], DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()), DataTypes.FIELD("data", DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()), DataTypes.FIELD("devid", DataTypes.STRING()), DataTypes.FIELD('sip', DataTypes.STRING()), DataTypes.FIELD('dip', DataTypes.STRING()), DataTypes.FIELD("qr", DataTypes.STRING()), DataTypes.FIELD("queries", DataTypes.STRING()), DataTypes.FIELD("answers", DataTypes.STRING()), DataTypes.FIELD("qtypes", DataTypes.STRING()), DataTypes.FIELD("atypes", DataTypes.STRING()), DataTypes.FIELD("rcode", DataTypes.STRING()), DataTypes.FIELD("ts", DataTypes.STRING())])) ] )) result_file = "/tmp/test.csv" if os.path.exists(result_file): os.remove(result_file) t_env.register_table_sink("Results", CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'], [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], "/tmp/test.csv")) t_env.register_function("get_host_ip", get_host_ip) t_env.register_function("get_dns_server_ip", get_dns_server_ip) t_env.register_table("source", table) standard_table = t_env.sql_query("select data.*, stype as dns_type from source")\ .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')") t_env.register_table("standard_table", standard_table) final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as host_ip," "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table") final_table.insert_into("Results") t_env.execute("test") if __name__ == '__main__': test_case()
The plan is as following which is not correct:
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED.