Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
2.23.0
Description
Based on the examples from Slowly updating side input using windowing I attempted to test the PeriodicImpulse using no variables, such that it triggered immediately and ran forever.
The below code shows how:
def pair_account_ids( api_key: str, account_ids: Dict[str, str] ) -> Optional[Tuple[str, str, int]]: if api_key not in account_ids: return None return (api_key, account_ids[api_key], int(time.time())) def echo(elm) -> Dict[str, str]: print(elm) return elm def api_keys(elm) -> Dict[str, str]: return {"<api_key_1>": "<account_id_1>", "<api_key_2>": "<account_id_2>"} pipeline_options = PipelineOptions(streaming=True) with TestPipeline( options=pipeline_options, runner=beam.runners.DirectRunner() ) as p: side_input = ( p | "PeriodicImpulse" >> PeriodicImpulse( # start_timestamp=start, # stop_timestamp=stop, fire_interval=5, apply_windowing=True, ) | "api_keys" >> beam.Map(api_keys) ) main_input = ( p | "MpImpulse" >> beam.Create(["<api_key_1>", "<api_key_2>", "<unknown_api_key>"]) | "MapMpToTimestamped" >> beam.Map(lambda src: TimestampedValue(src, time.time())) | "WindowMpInto" >> beam.WindowInto(beam.window.FixedWindows(5)) ) result = ( main_input | "Pair with AccountIDs" >> beam.Map( pair_account_ids, account_ids=beam.pvalue.AsSingleton(side_input) ) | "filter" >> beam.Filter(lambda x: x is not None) | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}")) ) print(f"done: {int(time.time())}")
The above pipeline throws the following exception however:
Traceback (most recent call last): File "/test/not_test.py", line 141, in test_side_input | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}")) File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__ self.run().wait_until_finish() File "/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py", line 112, in run False if self.not_use_test_runner_api else test_runner_api)) File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 521, in run allow_proto_holders=True).run(False) File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run return self.runner.run_pipeline(self, self._options) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline return runner.run_pipeline(pipeline, options) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_via_runner_api return self.run_stages(stage_context, stages) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 340, in run_stages bundle_context_manager, File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 519, in _run_stage bundle_manager) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 557, in _run_bundle data_input, data_output, input_timers, expected_timer_output) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 941, in process_bundle timer_inputs)): File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator yield fs.pop().result() File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result return self.__get_result() File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 937, in execute dry_run) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 837, in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 352, in push response = self.worker.do_instruction(request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 755, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process File "apache_beam/runners/worker/operations.py", line 764, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process File "apache_beam/runners/common.py", line 971, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction File "apache_beam/runners/common.py", line 711, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 807, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/periodicsequence.py", line 124, in process timestamp.Timestamp(current_output_timestamp)) File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/timestamp.py", line 64, in __init__ 'Cannot interpret %s %s as seconds.' % (seconds, type(seconds))) TypeError: Cannot interpret Timestamp(1599216802.136201) <class 'apache_beam.utils.timestamp.Timestamp'> as seconds.
Attachments
Issue Links
- links to