Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21013

Blink planner does not ingest timestamp into StreamRecord

    XMLWordPrintableJSON

Details

    Description

      Currently, the rowtime attribute is not put into the StreamRecord when leaving the Table API to DataStream API. The legacy planner supports this, but the timestamp is null when using the Blink planner.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
                  EnvironmentSettings settings =
                          EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
                  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
      
              DataStream<Order> orderA =
                      env.fromCollection(
                              Arrays.asList(
                                      new Order(1L, "beer", 3),
                                      new Order(1L, "diaper", 4),
                                      new Order(3L, "rubber", 2)));
      
              DataStream<Order> orderB =
                      orderA.assignTimestampsAndWatermarks(
                              new AssignerWithPunctuatedWatermarks<Order>() {
                                  @Nullable
                                  @Override
                                  public Watermark checkAndGetNextWatermark(
                                          Order lastElement, long extractedTimestamp) {
                                      return new Watermark(extractedTimestamp);
                                  }
      
                                  @Override
                                  public long extractTimestamp(Order element, long recordTimestamp) {
                                      return element.user;
                                  }
                              });
      
              Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), $("product"), $("amount"));
      
              // union the two tables
              Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);
      
              tEnv.toAppendStream(result, Row.class)
                      .process(
                              new ProcessFunction<Row, Row>() {
                                  @Override
                                  public void processElement(Row value, Context ctx, Collector<Row> out)
                                          throws Exception {
                                      System.out.println("TIMESTAMP" + ctx.timestamp());
                                  }
                              })
                      .print();
      
              env.execute();
      

      Attachments

        Activity

          People

            leonard Leonard Xu
            twalthr Timo Walther
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: