Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11979

Can't use ReadFromMongoDB with a datetime in filter

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.28.0
    • 2.29.0
    • io-py-mongodb
    • None

    Description

      I'm having an issue while using a filter containing a datetime.

      This filter works directly in pymongo but not in ReadFromMongoDB.
      _BoundedMongoSource.display_data() seems to be the source of the issue.
       
      Fixing the line 267:

      res['filter'] = json.dumps(self.filter)

      by using: 

       

      # from bson import json_util
      res['filter'] = json.dumps(self.filter, default=json_util.default)

      Here is an example of my code : 

      import apache_beam as beam
      from apache_beam.io import ReadFromMongoDB
      import datetime
      
      inputs_query = {"created_at": { "$gte": datetime.datetime.now() } } 
      
      with beam.Pipeline() as p:
          p_inputs = (p  | 'Read Mongo Inputs' >> ReadFromMongoDB(uri=mongo_db_uri, 
                                                                  db=db, 
                                                                  coll=input_coll, 
                                                                  filter=inputs_query
                                                                  )
                          | 'Count all Inputs' >> beam.combiners.Count.Globally()
                          | 'Print Inputs' >> beam.Map(print)
                      )
      

      I get the following error : 

      mongomicrotest.py:19: FutureWarning: ReadFromMongoDB is experimental.
        | 'Print Inputs' >> beam.Map(print)
      Traceback (most recent call last):
        File "mongomicrotest.py", line 19, in <module>
          | 'Print Inputs' >> beam.Map(print)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 1058, in __ror__
          return self.transform.__ror__(pvalueish, self.label)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 573, in __ror__
          result = p.apply(self, pvalueish, label)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pipeline.py", line 646, in apply
          return self.apply(transform, pvalueish)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pipeline.py", line 689, in apply
          pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 188, in apply
          return m(transform, input, options)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform
          return transform.expand(input)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/io/mongodbio.py", line 163, in expand
          return pcoll | iobase.Read(self._mongo_source)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pvalue.py", line 141, in __or__
          return self.pipeline.apply(ptransform, self)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pipeline.py", line 689, in apply
          pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 188, in apply
          return m(transform, input, options)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform
          return transform.expand(input)
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 894, in expand
          display_data = self.source.display_data() or {}
        File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/io/mongodbio.py", line 267, in display_data
          res['filter'] = json.dumps(self.filter)
        File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/__init__.py", line 231, in dumps
          return _default_encoder.encode(obj)
        File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/encoder.py", line 199, in encode
          chunks = self.iterencode(o, _one_shot=True)
        File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/encoder.py", line 257, in iterencode
          return _iterencode(o, 0)
        File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/encoder.py", line 180, in default
          o.__class__.__name__)
      TypeError: Object of type 'datetime' is not JSON serializable
      

      Maybe there is a way to correctly pass a datetime in the filter ?
      This is a blocker in our company project.

      Attachments

        Issue Links

          Activity

            People

              yichi Yichi Zhang
              Mériaux Gaël
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h
                  3h