Details
-
Sub-task
-
Status: Open
-
P2
-
Resolution: Unresolved
-
None
-
None
Description
The following test case does not work properly:
def test_dynamic_timer_clear_then_set_timer(self): class EmitTwoEvents(DoFn): EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK) def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)): yield ('1', 'set') emit.set(1) @on_timer(EMIT_CLEAR_SET_TIMER) def emit_clear(self): yield ('1', 'clear') class DynamicTimerDoFn(DoFn): EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK) def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)): if element[1] == 'set': emit.set(10, dynamic_timer_tag='emit1') emit.set(20, dynamic_timer_tag='emit2') if element[1] == 'clear': emit.set(30, dynamic_timer_tag='emit3') emit.clear(dynamic_timer_tag='emit3') emit.set(40, dynamic_timer_tag='emit3') return [] @on_timer(EMIT_TIMER_FAMILY) def emit_callback( self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam): yield (tag, ts) with TestPipeline() as p: res = ( p | beam.Create([('1', 'impulse')]) | beam.ParDo(EmitTwoEvents()) | beam.ParDo(DynamicTimerDoFn())) assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)])