Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
2.12.0
Description
Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a "too many open files" error within 2 minutes for a busy topic.
I am not exactly sure, but it appears that "pubsub.SubscriberClient()" creates a grpc channel which must be closed explicitly after we are done pulling messages from that channel. This may have been due to change that was introduced in grpc v1.12.0 ("a new grpc.Channel.close method is introduced and correct use of gRPC Python now requires that channels be closed after use"). If the underling grcp channel is not closed and many "pubsub.SubscriberClient" instances are created, the system may run out of available file handles, resulting in "too many open files" errors. A similar issue is reported here: https://github.com/googleapis/google-cloud-python/issues/5523.
The issue can be reproduced by running the following file:
# directrunner_streaming_tmof.py from __future__ import print_function import multiprocessing import os import subprocess import time import apache_beam as beam from google.cloud import pubsub_v1 def count_open_files(): """Count the number of files opened by current process.""" pid = multiprocessing.current_process().pid lsof_out = subprocess.check_output(["lsof", "-p", str(pid)]) num_open_files = len(lsof_out.strip().split("\n")) - 1 return num_open_files def start_streaming_pipeline(project_id, subscription_path): """Create a simple streaming pipeline.""" runner = beam.runners.direct.DirectRunner() pipeline_options = beam.pipeline.PipelineOptions(project=project_id, streaming=True) taxirides_pc = ( # beam.Pipeline(runner=runner, options=pipeline_options) | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path) ) results = taxirides_pc.pipeline.run() return results def monitor(): """Periodically print the number of open files.""" start_time = time.time() for _ in range(20): num_open_files = count_open_files() time_elapsed = time.time() - start_time print( "Time elapsed: {:<3s}s, Number of open files: {}".format( str(round(time_elapsed, 0)), num_open_files ) ) if num_open_files > 1000: break time.sleep(5) if __name__ == "__main__": project_id = "{project_id}" topic_path = "projects/pubsub-public-data/topics/taxirides-realtime" client = pubsub_v1.SubscriberClient() subscription_path = client.subscription_path(project_id, "taxirides-realtime-sub") subscription = client.create_subscription(subscription_path, topic_path) print("Subscription created: {}".format(subscription_path)) try: results = start_streaming_pipeline(project_id, subscription_path) monitor() finally: client.delete_subscription(subscription_path) print("Subscription deleted: {}".format(subscription_path)) pass
Currently, the output from running this script looks something like:
Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub Time elapsed: 0.0s, Number of open files: 160 Time elapsed: 5.0s, Number of open files: 179 Time elapsed: 11.0s, Number of open files: 247 Time elapsed: 16.0s, Number of open files: 339 Time elapsed: 21.0s, Number of open files: 436 Time elapsed: 26.0s, Number of open files: 523 Time elapsed: 31.0s, Number of open files: 615 Time elapsed: 36.0s, Number of open files: 713 Time elapsed: 41.0s, Number of open files: 809 Time elapsed: 46.0s, Number of open files: 903 Time elapsed: 52.0s, Number of open files: 999 Time elapsed: 57.0s, Number of open files: 1095 Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub WARNING:root:The DirectPipelineResult is being garbage-collected while the DirectRunner is still running the corresponding pipeline. This may lead to incomplete execution of the pipeline if the main thread exits before pipeline completion. Consider using result.wait_until_finish() to wait for completion of pipeline execution.
Attachments
Issue Links
- links to