Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20787 Improve the Table API to make it usable
  3. FLINK-15801

Timestamp extractor created from properties does not work for some physical fields

    XMLWordPrintableJSON

    Details

      Description

      If a timestamp extractor is created from properties it can not use a physical field if the name of that field is equal to the logical field of the rowtime field.

      The code below fails:

      		StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
      		streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      		StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(streamExecutionEnvironment);
      		String allEventsTable = "allEventsTable";
      		fsTableEnv.connect(new Kafka()
      			.version("universal")
      			.topic("events")
      			.property("zookeeper.connect", "")
      			.property("bootstrap.servers", "localhost:9092")
      			.property("group.id", "dummyquery").startFromLatest())
      			.withSchema(new Schema()
      				.field("rule_id", Types.INT)
      				.field("sourceAddress", Types.STRING)
      				.field("deviceProduct", Types.STRING)
      				.field("destHost", Types.STRING)
      				.field("extra", Types.STRING)
      				.field("rowtime", Types.SQL_TIMESTAMP)
      				.rowtime(new Rowtime().timestampsFromField("rowtime").watermarksPeriodicBounded(2000))
      
      			)
      			.withFormat(new Json().failOnMissingField(false).deriveSchema())
      			.inAppendMode()
      			.registerTableSource(allEventsTable);
      
      		Table result = fsTableEnv.sqlQuery("select * from allEventsTable where sourceAddress='12345431'");
      
      		DataStream alert = fsTableEnv.toAppendStream(result, Row.class);
      		alert.print();
      

      with exception:

      Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'rowtime' could not be resolved by the field mapping.
          at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
          at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
          at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
          at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
          at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
          at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
          at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
          at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
          at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
      

      The problem is that the field is removed from the FieldMapping in org.apache.flink.table.descriptors.SchemaValidator#deriveFieldMapping.

      One possible solution could be to add:

      				if (isRowtime) {
      					Optional<String> timestampSource = properties.getOptionalString(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_FROM);
      					timestampSource.ifPresent(s -> mapping.put(s, s));
      				}
      

      We should also consider the case what happens if we compute generated columns on fields that were pruned in a similar way.

      Reported by a user: https://stackoverflow.com/questions/59857057/how-to-define-an-apache-flink-table-with-row-time-attribute

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                dwysakowicz Dawid Wysakowicz
              • Votes:
                0 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m