Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.28.0
-
None
Description
I'm having an issue while using a filter containing a datetime.
This filter works directly in pymongo but not in ReadFromMongoDB.
_BoundedMongoSource.display_data() seems to be the source of the issue.
Fixing the line 267:
res['filter'] = json.dumps(self.filter)
by using:
# from bson import json_util res['filter'] = json.dumps(self.filter, default=json_util.default)
Here is an example of my code :
import apache_beam as beam from apache_beam.io import ReadFromMongoDB import datetime inputs_query = {"created_at": { "$gte": datetime.datetime.now() } } with beam.Pipeline() as p: p_inputs = (p | 'Read Mongo Inputs' >> ReadFromMongoDB(uri=mongo_db_uri, db=db, coll=input_coll, filter=inputs_query ) | 'Count all Inputs' >> beam.combiners.Count.Globally() | 'Print Inputs' >> beam.Map(print) )
I get the following error :
mongomicrotest.py:19: FutureWarning: ReadFromMongoDB is experimental. | 'Print Inputs' >> beam.Map(print) Traceback (most recent call last): File "mongomicrotest.py", line 19, in <module> | 'Print Inputs' >> beam.Map(print) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 1058, in __ror__ return self.transform.__ror__(pvalueish, self.label) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 573, in __ror__ result = p.apply(self, pvalueish, label) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pipeline.py", line 646, in apply return self.apply(transform, pvalueish) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pipeline.py", line 689, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 188, in apply return m(transform, input, options) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform.expand(input) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/io/mongodbio.py", line 163, in expand return pcoll | iobase.Read(self._mongo_source) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pvalue.py", line 141, in __or__ return self.pipeline.apply(ptransform, self) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/pipeline.py", line 689, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 188, in apply return m(transform, input, options) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform.expand(input) File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 894, in expand display_data = self.source.display_data() or {} File "/Users/gael/venv/venv36/lib/python3.6/site-packages/apache_beam/io/mongodbio.py", line 267, in display_data res['filter'] = json.dumps(self.filter) File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/__init__.py", line 231, in dumps return _default_encoder.encode(obj) File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/encoder.py", line 257, in iterencode return _iterencode(o, 0) File "/Users/gael/.pyenv/versions/3.6.11/lib/python3.6/json/encoder.py", line 180, in default o.__class__.__name__) TypeError: Object of type 'datetime' is not JSON serializable
Maybe there is a way to correctly pass a datetime in the filter ?
This is a blocker in our company project.
Attachments
Issue Links
- links to