tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2));
tableEnv.executeSql("drop table if exists persons_table_kafka2");
String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
" `id` BIGINT,\n" +
" `name` STRING,\n" +
" `age` INT,\n" +
" proctime as PROCTIME(),\n" +
" `ts` TIMESTAMP(3),\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'persons_test_auto',\n" +
" 'properties.bootstrap.servers' = 'node2:6667',\n" +
" 'properties.group.id' = 'testGrodsu1765',\n" +
" 'scan.startup.mode' = 'group-offsets',\n" +
" 'format' = 'json'\n" +
")";
tableEnv.executeSql(kafka_source_sql);
tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
" `id` BIGINT,\n" +
" `name` STRING,\n" +
" `message` STRING,\n" +
" `ts` TIMESTAMP(3) ," +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'persons_extra_message_auto',\n" +
" 'properties.bootstrap.servers' = 'node2:6667',\n" +
" 'properties.group.id' = 'testGroud125313',\n" +
" 'scan.startup.mode' = 'group-offsets',\n" +
" 'format' = 'json'\n" +
")";
tableEnv.executeSql(kafka_source_sql2);
tableEnv.executeSql(
"CREATE TEMPORARY VIEW persons_message_table22 AS \n" +
"SELECT id, name, message,ts \n" +
" FROM (\n" +
" SELECT *,\n" +
" ROW_NUMBER() OVER (PARTITION BY name \n" +
" ORDER BY ts DESC) AS rowNum \n" +
" FROM persons_message_table_kafka2 " +
" )\n" +
"WHERE rowNum = 1");
tableEnv.executeSql("" +
"CREATE TEMPORARY VIEW result_data_view " +
" as " +
" select " +
" t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
" from persons_table_kafka2 t1 " +
" left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on t1.name = t2.name "
);
Table resultTable = tableEnv.from("result_data_view");
DataStream<RowData> rowDataDataStream = tableEnv.toAppendStream(resultTable, RowData.class);
rowDataDataStream.print();
env.execute("test_it");