Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21013

Blink planner does not ingest timestamp into StreamRecord

    XMLWordPrintableJSON

    Details

      Description

      Currently, the rowtime attribute is not put into the StreamRecord when leaving the Table API to DataStream API. The legacy planner supports this, but the timestamp is null when using the Blink planner.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
                  EnvironmentSettings settings =
                          EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
                  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
      
              DataStream<Order> orderA =
                      env.fromCollection(
                              Arrays.asList(
                                      new Order(1L, "beer", 3),
                                      new Order(1L, "diaper", 4),
                                      new Order(3L, "rubber", 2)));
      
              DataStream<Order> orderB =
                      orderA.assignTimestampsAndWatermarks(
                              new AssignerWithPunctuatedWatermarks<Order>() {
                                  @Nullable
                                  @Override
                                  public Watermark checkAndGetNextWatermark(
                                          Order lastElement, long extractedTimestamp) {
                                      return new Watermark(extractedTimestamp);
                                  }
      
                                  @Override
                                  public long extractTimestamp(Order element, long recordTimestamp) {
                                      return element.user;
                                  }
                              });
      
              Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), $("product"), $("amount"));
      
              // union the two tables
              Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);
      
              tEnv.toAppendStream(result, Row.class)
                      .process(
                              new ProcessFunction<Row, Row>() {
                                  @Override
                                  public void processElement(Row value, Context ctx, Collector<Row> out)
                                          throws Exception {
                                      System.out.println("TIMESTAMP" + ctx.timestamp());
                                  }
                              })
                      .print();
      
              env.execute();
      

        Attachments

          Activity

            People

            • Assignee:
              Leonard Xu Leonard Xu
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: