Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12119

Python IO MongoDB: integer and string `_id` keys are not supported

Details

    • Improvement
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.27.0, 2.28.0
    • 2.31.0
    • io-py-mongodb

    Description

      Python IO MongoDB: integer and string `_id` keys are not supported.

       

      Usually ObjectId is using for `_id` key, but sometimes you can deal with int and str keys. Reading from such MongoDB collection will raise errors. 

      Integer `_id` key:

      Traceback (most recent call last):
        File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
          element)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
          range_tracker = element_source.get_range_tracker(None, None)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
          start_position, stop_position)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
          stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
          id_number = _ObjectIdHelper.id_to_int(object_id)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
          ints = struct.unpack('>III', id.binary)
      AttributeError: 'int' object has no attribute 'binary'
      During handling of the above exception, another exception occurred:
      Traceback (most recent call last):
        File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
          sys.exit(run())
        File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
          options=options, ingestion_ts=ingestion_ts, table_name=table_name
        File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
          self.result = self.run()
        File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
          return self.runner.run_pipeline(self, self._options)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
          return runner.run_pipeline(pipeline, options)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
          pipeline.to_runner_api(default_environment=self._default_environment))
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
          return self.run_stages(stage_context, stages)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
          bundle_context_manager,
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
          bundle_manager)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
          data_input, data_output, input_timers, expected_timer_output)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
          result_future = self._worker_handler.control_conn.push(process_bundle_req)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
          response = self.worker.do_instruction(request)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
          element.data)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
          self.output(decoded_value)
        File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
          raise exc.with_traceback(traceback)
        File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
          element)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
          range_tracker = element_source.get_range_tracker(None, None)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
          start_position, stop_position)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
          stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
          id_number = _ObjectIdHelper.id_to_int(object_id)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
          ints = struct.unpack('>III', id.binary)
      AttributeError: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
      ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
      

      String `_id` key:

      Traceback (most recent call last):
        File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
          element)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
          range_tracker = element_source.get_range_tracker(None, None)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
          start_position, stop_position)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
          stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
          id_number = _ObjectIdHelper.id_to_int(object_id)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
          ints = struct.unpack('>III', id.binary)
      AttributeError: 'str' object has no attribute 'binary'During handling of the above exception, another exception occurred:Traceback (most recent call last):
        File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
          sys.exit(run())
        File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
          options=options, ingestion_ts=ingestion_ts, table_name=table_name
        File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
          self.result = self.run()
        File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
          return self.runner.run_pipeline(self, self._options)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
          return runner.run_pipeline(pipeline, options)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
          pipeline.to_runner_api(default_environment=self._default_environment))
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
          return self.run_stages(stage_context, stages)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
          bundle_context_manager,
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
          bundle_manager)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
          data_input, data_output, input_timers, expected_timer_output)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
          result_future = self._worker_handler.control_conn.push(process_bundle_req)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
          response = self.worker.do_instruction(request)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
          element.data)
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
          self.output(decoded_value)
        File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
          raise exc.with_traceback(traceback)
        File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
          element)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
          range_tracker = element_source.get_range_tracker(None, None)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
          start_position, stop_position)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
          stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
          id_number = _ObjectIdHelper.id_to_int(object_id)
        File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
          ints = struct.unpack('>III', id.binary)
      AttributeError: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
      ERROR:root:mongo_to_bq_raw.py: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
      
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Maksym Skorupskyi Maksym Skorupskyi
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 16h 50m
                  16h 50m

                  Slack

                    Issue deployment