Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9245

Unable to pull datatore Entity which contains dict properties

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Triage Needed
    • Priority: P3
    • Resolution: Unresolved
    • Affects Version/s: 2.18.0
    • Fix Version/s: None
    • Component/s: sdk-py-core
    • Labels:
      None

      Description

      Hello, we are facing a small bug while reading Datastore entities using ReadFromDatastore transform (python SDK, 2.17 & 2.18)

      We are unable to retrieve entities that contain a dictionary. We think there is implicit casting from these properties into Datastore entity, but when the client is trying to retrieve the entity using the key, it breaks (because this entity has no key).

       Stacktrace

        File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py", line 269, in process
          yield types.Entity.from_client_entity(client_entity)
        File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 225, in from_client_entity
          value = Entity.from_client_entity(value)
        File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 219, in from_client_entity
          Key.from_client_key(client_entity.key),
        File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 156, in from_client_key
          return Key(client_key.flat_path, project=client_key.project,
      AttributeError: 'NoneType' object has no attribute 'flat_path' [while running 'Read from datastore/Read']
      

       

       Here is some code to reproduce:

      1. Insert a datastore entity using the given function
      2. Run the dataflow pipeline using DirectRunner

       

      import apache_beam as beam
      from google.cloud import datastore
      from apache_beam.io.gcp.datastore.v1new.types import Query
      from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
      from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions
      
      DATASTORE_KIND = "my_entity_kind"
      PROJECT_ID = "my_project_id"
      
      
      def create_datastore_entity():
          client = datastore.Client(PROJECT_ID)
      
          key = client.key(DATASTORE_KIND, "my_task")
      
          entity = client.get(key=key)
          if entity is not None:
              raise Exception("Existing entity")
          else:
              entity_dict = {"regular_field": "test", "nested_field": {"field1": "my_field1"}}
              entity = datastore.Entity(key=key)
          entity_dict = {k: v for k, v in entity_dict.items()}
          entity.update(entity_dict)
          client.put(entity)
      
      
      def my_func(element):
          print(element)
          return element
      
      
      def run():
          pipeline_options = PipelineOptions()
      
          pipeline_options.view_as(StandardOptions).runner = "DirectRunner"
          p = beam.Pipeline(options=pipeline_options)
          my_ds_query = Query(kind=DATASTORE_KIND, project=PROJECT_ID,)
          p | "Read from datastore" >> ReadFromDatastore(
              query=my_ds_query
          ) | "Print entity" >> beam.Map(my_func)
          p.run().wait_until_finish()
      
      
      if __name__ == "__main__":
          create_datastore_entity()
          run()
      

       

      Workaround

      Currently, we mocked the library using this code (modifying the Entity class, in `sdks/python/apache_beam/io/gcp/datastore/v1new/types.py`, aka this line ). 

        @staticmethod
        def from_client_entity(client_entity):
          res = Entity(
              Key.from_client_key(client_entity.key),
              exclude_from_indexes=set(client_entity.exclude_from_indexes))
          for name, value in client_entity.items():
            if isinstance(value, key.Key):
              value = Key.from_client_key(value)
            if isinstance(value, entity.Entity):
              if value.key:
                value = Entity.from_client_entity(value)
              else:
                value = {k:v for k,v in value.items()}
            res.properties[name] = value
          return res
      

       If the workaround works for you, I can do the PR.

       

      Thanks, Colin

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              clenost Colin Le Nost
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: