Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23219

temproary join ttl configruation does not take effect

    XMLWordPrintableJSON

Details

    Description

      • version: flink 1.12.2
      •  problem: I run the job of table A temproary left join table B, and set the table.exec.state.ttl configuration
        to 3 hour or 2 sencond for test. But the task status keeps growing for more than 7 days.
      •  code
      tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2));
      
      tableEnv.executeSql("drop table if exists persons_table_kafka2");
       String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
       " `id` BIGINT,\n" +
       " `name` STRING,\n" +
       " `age` INT,\n" +
       " proctime as PROCTIME(),\n" +
       " `ts` TIMESTAMP(3),\n" +
       " WATERMARK FOR ts AS ts\n" +
       ") WITH (\n" +
       " 'connector' = 'kafka',\n" +
       " 'topic' = 'persons_test_auto',\n" +
       " 'properties.bootstrap.servers' = 'node2:6667',\n" +
       " 'properties.group.id' = 'testGrodsu1765',\n" +
       " 'scan.startup.mode' = 'group-offsets',\n" +
       " 'format' = 'json'\n" +
       ")";
       tableEnv.executeSql(kafka_source_sql);
      
      
      
       tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
       String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
       " `id` BIGINT,\n" +
       " `name` STRING,\n" +
       " `message` STRING,\n" +
       " `ts` TIMESTAMP(3) ," +
      // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
       " WATERMARK FOR ts AS ts\n" +
       ") WITH (\n" +
       " 'connector' = 'kafka',\n" +
       " 'topic' = 'persons_extra_message_auto',\n" +
       " 'properties.bootstrap.servers' = 'node2:6667',\n" +
       " 'properties.group.id' = 'testGroud125313',\n" +
       " 'scan.startup.mode' = 'group-offsets',\n" +
       " 'format' = 'json'\n" +
       ")";
       tableEnv.executeSql(kafka_source_sql2);
      
      
       tableEnv.executeSql(
       "CREATE TEMPORARY VIEW persons_message_table22 AS \n" +
       "SELECT id, name, message,ts \n" +
       " FROM (\n" +
       " SELECT *,\n" +
       " ROW_NUMBER() OVER (PARTITION BY name \n" +
       " ORDER BY ts DESC) AS rowNum \n" +
       " FROM persons_message_table_kafka2 " +
       " )\n" +
       "WHERE rowNum = 1");
      
      
       tableEnv.executeSql("" +
       "CREATE TEMPORARY VIEW result_data_view " +
       " as " +
       " select " +
       " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
       " from persons_table_kafka2 t1 " +
       " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on t1.name = t2.name "
       );
      
      Table resultTable = tableEnv.from("result_data_view");
      DataStream<RowData> rowDataDataStream = tableEnv.toAppendStream(resultTable, RowData.class);
      rowDataDataStream.print();
      
      env.execute("test_it");
      
      • the result like 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              waywtdcc waywtdcc
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: