Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
Description
When run a test pipeline such as
public void testTimestampedValue() throws Exception { final String timerId = "foo"; DoFn<KV<String, Long>, KV<Long, Instant>> statefn = new DoFn<KV<String, Long>, KV<Long, Instant>>() { @TimerId(timerId) private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); @ProcessElement public void processElement( @TimerId(timerId) Timer timer, @Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) { r.output(KV.of(3L, timestamp)); } @OnTimer(timerId) public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) { // do nothing. Since whether timer is involved doesn’t make difference } }; PCollection<KV<Long, Instant>> output = pipeline .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 37L), new Instant(123L)))) .apply(ParDo.of(statefn)); PAssert.that(output).containsInAnyOrder(KV.of(3L, new Instant(123L))); pipeline.run(); }
On dataflow with fnapi, the timestamp of output in PAssert won't match.
Attachments
Issue Links
- links to