Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19976 FLIP-136: Improve interoperability between DataStream and Table API
  3. FLINK-20637

Table convert to dataStream twice will result in two data streams

    XMLWordPrintableJSON

    Details

      Description

       

      Code

      //代码占位符
          EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
          env.enableCheckpointing(50000);
          env.setParallelism(10);    
      
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
         
          tableEnv.executeSql("create table feeds_expose_click_profile ( docId string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category string ,subCategory string ,keywords string ,tags  string, eventTime bigint,  rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'false' )");
          Table table = tableEnv.from("feeds_expose_click_profile");
          TypeInformation<Row> typeInfo = table.getSchema().toRowType();    DataStream dataStream = tableEnv .toRetractStream(table, typeInfo)
              .filter(row -> row.f0)
              .map(row -> row.f1)
              .returns(typeInfo);    
          Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, rowTime from feeds_expose_click_profile");
          tableEnv.createTemporaryView("tableFilter", tableFilter);    TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType();
          DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1)
              .filter(row -> row.f0)
              .map(row -> row.f1)
              .returns(typeInfo1);    dataStream1.print();
          System.out.println(env.getExecutionPlan());
      

       

       

      ExecutionPlan

       

      //代码占位符
      
      {
        "nodes" : [ {
          "id" : 1,
          "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
          "pact" : "Data Source",
          "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
          "parallelism" : 10
        }, {
          "id" : 2,
          "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
          "pact" : "Operator",
          "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 1,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 3,
          "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
          "pact" : "Operator",
          "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 2,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 4,
          "type" : "SinkConversionToTuple2",
          "pact" : "Operator",
          "contents" : "SinkConversionToTuple2",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 3,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 5,
          "type" : "Filter",
          "pact" : "Operator",
          "contents" : "Filter",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 4,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 6,
          "type" : "Map",
          "pact" : "Operator",
          "contents" : "Map",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 5,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 7,
          "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
          "pact" : "Data Source",
          "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
          "parallelism" : 10
        }, {
          "id" : 8,
          "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
          "pact" : "Operator",
          "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 7,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 9,
          "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
          "pact" : "Operator",
          "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 8,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 10,
          "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
          "pact" : "Operator",
          "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 9,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 11,
          "type" : "SinkConversionToTuple2",
          "pact" : "Operator",
          "contents" : "SinkConversionToTuple2",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 10,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 12,
          "type" : "Filter",
          "pact" : "Operator",
          "contents" : "Filter",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 11,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 13,
          "type" : "Map",
          "pact" : "Operator",
          "contents" : "Map",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 12,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 14,
          "type" : "Sink: Print to Std. Out",
          "pact" : "Data Sink",
          "contents" : "Sink: Print to Std. Out",
          "parallelism" : 10,
          "predecessors" : [ {
            "id" : 13,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        } ]
      }
      

       

      I encountered this problem while using waterdrop. How to fix this problem.

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Janze Wu
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: