Details
Description
A simple map over a datastore kind in local emulator using the new v1new.datastoreio.ReadFromDatastore skip entities.
The kind has 1516 entities, and when I map over it using the old ReadFromDatastore transform, it maps all of them, i.e., I can map to id and write to text file.
But the new transform only maps 365 entities. There is no error. The tail of the standard output is:
INFO:root:Latest stats timestamp for kind face_apilog is 2019-06-18 08:15:21+00:00 INFO:root:Estimated size bytes for query: 116188 INFO:root:Splitting the query into 12 splits INFO:root:Running (((GetEntities/Reshuffle/ReshufflePerKey/GroupByKey/Read)(ref_AppliedPTransform_GetEntities/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))((ref_AppliedPTransform_GetEntities/Reshuffle/RemoveRandomKeys_15)(ref_AppliedPTransform_GetEntities/Read_16)))((ref_AppliedPTransform_MapToId_17)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WriteBundles_24)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Pair_25)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WindowInto(WindowIntoFn)_26)(WriteToFile/Write/WriteImpl/GroupByKey/Write))))) INFO:root:Running (WriteToFile/Write/WriteImpl/GroupByKey/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Extract_31)(ref_PCollection_PCollection_20/Write)) INFO:root:Running (ref_PCollection_PCollection_12/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/PreFinalize_32)(ref_PCollection_PCollection_21/Write)) INFO:root:Running (ref_PCollection_PCollection_12/Read)+(ref_AppliedPTransform_WriteToFile/Write/WriteImpl/FinalizeWrite_33) INFO:root:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1 INFO:root:Renamed 1 shards in 0.12 seconds.
The code for the job on the new transform is:
from __future__ import absolute_import import logging import os import sys import apache_beam as beam from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore from apache_beam.io.gcp.datastore.v1new.types import Query # TODO: should be set outside of python process os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085' def map_to_id(element): face_log_id = element.to_client_entity().id return face_log_id def run(argv=None): p = beam.Pipeline(argv=argv) project = 'dev' (p | 'GetEntities' >> ReadFromDatastore(Query(kind='face_apilog', project=project)) | 'MapToId' >> beam.Map(map_to_id) | 'WriteToFile' >> beam.io.WriteToText('result') ) p.run().wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run(sys.argv)
For comparison, the code for the job on the old transform is:
from __future__ import absolute_import import logging import os import sys import apache_beam as beam from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore from google.cloud.proto.datastore.v1 import query_pb2 # TODO: should be set outside of python process os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085' def map_to_id(element): face_log_id = element.key.path[-1].id return face_log_id def run(argv=None): p = beam.Pipeline(argv=argv) project = 'dev' query = query_pb2.Query() query.kind.add().name = 'face_apilog' (p | 'GetEntities' >> ReadFromDatastore(project=project, query=query) # TODO: ParDo??? | 'MapToId' >> beam.Map(map_to_id) | 'WriteToFile' >> beam.io.WriteToText('result') ) p.run().wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run(sys.argv)
Attachments
Issue Links
- links to