Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
2.28.0
-
None
Description
PeriodicImpluse throws TypeError if Timetamp is used as start_timestamp and stop_timestamp parameters.
With the following example,
import logging from apache_beam import ParDo from apache_beam import Pipeline from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.periodicsequence import PeriodicImpulse def run(argv=None): options = PipelineOptions(argv) with Pipeline(options=options) as p: (p | PeriodicImpulse() # By default, # start_timestamp=Timestamp.now(), # stop_timestamp=MAX_TIMESTAMP, # fire_interval=360.0, | ParDo(lambda x: logging.info('element: %s', x)) ) if __name__ == '__main__': run()
Running with DirectRunner fails with the following stacktrace.
Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process element) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/transforms/periodicsequence.py", line 42, in initial_restriction total_outputs = math.ceil((end - start) / interval) TypeError: unsupported operand type(s) for /: 'Duration' and 'float'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/baeminbo/Documents/workspace/dataflow-python/periodic_impulse_pipeline.py", line 22, in <module> run() File "/Users/baeminbo/Documents/workspace/dataflow-python/periodic_impulse_pipeline.py", line 18, in run | ParDo(lambda x: logging.info('element: %s', x)) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 580, in __exit__ self.result = self.run() File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/pipeline.py", line 559, in run return self.runner.run_pipeline(self, self._options) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/direct/direct_runner.py", line 133, in run_pipeline return runner.run_pipeline(pipeline, options) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api return self.run_stages(stage_context, stages) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 359, in run_stages bundle_context_manager, File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 555, in _run_stage bundle_manager) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 595, in _run_bundle data_input, data_output, input_timers, expected_timer_output) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 896, in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push response = self.worker.do_instruction(request) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle element.data) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process element) File "/Users/baeminbo/Documents/workspace/dataflow-python/env3/lib/python3.6/site-packages/apache_beam/transforms/periodicsequence.py", line 42, in initial_restriction total_outputs = math.ceil((end - start) / interval) TypeError: unsupported operand type(s) for /: 'Duration' and 'float' [while running 'PeriodicImpulse/GenSequence/PairWithRestriction']Process finished with exit code 1