Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7446

Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a "too many open files" error

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      Reading from a Cloud Pub/Sub topicĀ inside Python's DirectRunner results in a "too many open files" error within 2 minutes for a busy topic.

      I am not exactly sure, but it appears that "pubsub.SubscriberClient()" creates a grpc channel which must be closed explicitly after we are done pulling messages from that channel. This may have been due to change that was introduced in grpc v1.12.0 ("a new grpc.Channel.close method is introduced and correct use of gRPC Python now requires that channels be closed after use"). If the underling grcp channel is not closed and many "pubsub.SubscriberClient" instances are created, the system may run out of available file handles, resulting in "too many open files" errors. A similar issue is reported here: https://github.com/googleapis/google-cloud-python/issues/5523.

      The issue can be reproduced by running the following file:

      # directrunner_streaming_tmof.py
      from __future__ import print_function
      
      import multiprocessing
      import os
      import subprocess
      import time
      
      import apache_beam as beam
      from google.cloud import pubsub_v1
      
      
      def count_open_files():
          """Count the number of files opened by current process."""
          pid = multiprocessing.current_process().pid
          lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
          num_open_files = len(lsof_out.strip().split("\n")) - 1
          return num_open_files
      
      
      def start_streaming_pipeline(project_id, subscription_path):
          """Create a simple streaming pipeline."""
          runner = beam.runners.direct.DirectRunner()
          pipeline_options = beam.pipeline.PipelineOptions(project=project_id, streaming=True)
          taxirides_pc = (
              #
              beam.Pipeline(runner=runner, options=pipeline_options)
              | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
          )
          results = taxirides_pc.pipeline.run()
          return results
      
      
      def monitor():
          """Periodically print the number of open files."""
          start_time = time.time()
          for _ in range(20):
              num_open_files = count_open_files()
              time_elapsed = time.time() - start_time
              print(
                  "Time elapsed: {:<3s}s, Number of open files: {}".format(
                      str(round(time_elapsed, 0)), num_open_files
                  )
              )
              if num_open_files > 1000:
                  break
              time.sleep(5)
      
      
      if __name__ == "__main__":
          project_id = "{project_id}"
          topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
      
          client = pubsub_v1.SubscriberClient()
          subscription_path = client.subscription_path(project_id, "taxirides-realtime-sub")
      
          subscription = client.create_subscription(subscription_path, topic_path)
          print("Subscription created: {}".format(subscription_path))
          try:
              results = start_streaming_pipeline(project_id, subscription_path)
              monitor()
          finally:
              client.delete_subscription(subscription_path)
              print("Subscription deleted: {}".format(subscription_path))
              pass
      

      Currently, the output from running this script looks something like:

      Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
      Time elapsed: 0.0s, Number of open files: 160
      Time elapsed: 5.0s, Number of open files: 179
      Time elapsed: 11.0s, Number of open files: 247
      Time elapsed: 16.0s, Number of open files: 339
      Time elapsed: 21.0s, Number of open files: 436
      Time elapsed: 26.0s, Number of open files: 523
      Time elapsed: 31.0s, Number of open files: 615
      Time elapsed: 36.0s, Number of open files: 713
      Time elapsed: 41.0s, Number of open files: 809
      Time elapsed: 46.0s, Number of open files: 903
      Time elapsed: 52.0s, Number of open files: 999
      Time elapsed: 57.0s, Number of open files: 1095
      Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
      WARNING:root:The DirectPipelineResult is being garbage-collected while the DirectRunner is still running the corresponding pipeline. This may lead to incomplete execution of the pipeline if the main thread exits before pipeline completion. Consider using result.wait_until_finish() to wait for completion of pipeline execution.
      

      Ā 

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ostrokach Alexey Strokach

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                  Issue deployment