Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10854

PeriodicImpulse default arguments are not valid values

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • 2.23.0
    • 2.30.0
    • sdk-py-core

    Description

      Based on the examples from Slowly updating side input using windowing I attempted to test the PeriodicImpulse using no variables, such that it triggered immediately and ran forever.

      The below code shows how:

              def pair_account_ids(
                  api_key: str, account_ids: Dict[str, str]
              ) -> Optional[Tuple[str, str, int]]:
                  if api_key not in account_ids:
                      return None
      
                  return (api_key, account_ids[api_key], int(time.time()))
      
              def echo(elm) -> Dict[str, str]:
                  print(elm)
                  return elm
      
              def api_keys(elm) -> Dict[str, str]:
                  return {"<api_key_1>": "<account_id_1>", "<api_key_2>": "<account_id_2>"}
      
              pipeline_options = PipelineOptions(streaming=True)
              with TestPipeline(
                  options=pipeline_options, runner=beam.runners.DirectRunner()
              ) as p:
                  side_input = (
                      p
                      | "PeriodicImpulse"
                      >> PeriodicImpulse(
                          # start_timestamp=start,
                          # stop_timestamp=stop,
                          fire_interval=5,
                          apply_windowing=True,
                      )
                      | "api_keys" >> beam.Map(api_keys)
                  )
      
                  main_input = (
                      p
                      | "MpImpulse"
                      >> beam.Create(["<api_key_1>", "<api_key_2>", "<unknown_api_key>"])
                      | "MapMpToTimestamped"
                      >> beam.Map(lambda src: TimestampedValue(src, time.time()))
                      | "WindowMpInto" >> beam.WindowInto(beam.window.FixedWindows(5))
                  )
      
                  result = (
                      main_input
                      | "Pair with AccountIDs"
                      >> beam.Map(
                          pair_account_ids, account_ids=beam.pvalue.AsSingleton(side_input)
                      )
                      | "filter" >> beam.Filter(lambda x: x is not None)
                      | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}"))
                  )
              print(f"done:  {int(time.time())}")
      

      The above pipeline throws the following exception however:

      Traceback (most recent call last):
        File "/test/not_test.py", line 141, in test_side_input
          | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}"))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
          self.run().wait_until_finish()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py", line 112, in run
          False if self.not_use_test_runner_api else test_runner_api))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 521, in run
          allow_proto_holders=True).run(False)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
          return self.runner.run_pipeline(self, self._options)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
          return runner.run_pipeline(pipeline, options)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
          pipeline.to_runner_api(default_environment=self._default_environment))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_via_runner_api
          return self.run_stages(stage_context, stages)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 340, in run_stages
          bundle_context_manager,
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 519, in _run_stage
          bundle_manager)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 557, in _run_bundle
          data_input, data_output, input_timers, expected_timer_output)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 941, in process_bundle
          timer_inputs)):
        File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator
          yield fs.pop().result()
        File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
          return self.__get_result()
        File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
          raise self._exception
        File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run
          self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 937, in execute
          dry_run)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 837, in process_bundle
          result_future = self._worker_handler.control_conn.push(process_bundle_req)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 352, in push
          response = self.worker.do_instruction(request)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle
          element.data)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
          self.output(decoded_value)
        File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 755, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
        File "apache_beam/runners/worker/operations.py", line 764, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
        File "apache_beam/runners/common.py", line 971, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
        File "apache_beam/runners/common.py", line 711, in apache_beam.runners.common.PerWindowInvoker.invoke_process
        File "apache_beam/runners/common.py", line 807, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
        File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/periodicsequence.py", line 124, in process
          timestamp.Timestamp(current_output_timestamp))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/timestamp.py", line 64, in __init__
          'Cannot interpret %s %s as seconds.' % (seconds, type(seconds)))
      TypeError: Cannot interpret Timestamp(1599216802.136201) <class 'apache_beam.utils.timestamp.Timestamp'> as seconds.
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              prm Patrick Madsen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 10m
                  2h 10m