Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12864

FlinkRunner (DOCKER) changes HDFS HOST to a name resolvable to host, rather than Docker


    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.31.0
    • Missing
    • runner-flink
    • None
    • macOS 11.5.2, Flink 1.13.2, Beam 2.31.0, Docker for Mac 20.10.8


      I'm attempting to create a simple example of reading from an HDFS FileSystem using the Python SDK. I am able to do this with the direct runner, and am even able to read the filesystem directly in a simple Python file outside of a Beam pipeline (but using the Beam IO FileSystem class).

      When I create a Beam pipeline and submit it to Flink, it is unable to resolve the hostname of the Docker Host, because it is set to 'localhost'. I've tried setting `hdfs_host` in the pipeline options with the typical value of `host.docker.internal` to reach the Host's network, and even the IP address of my Docker Host (macOS) (which usually works and is resolvable when testing with dummy containers). The `host.docker.internal` fails because it is not resolvable by the Host. A situation is created where the Host and the container both need to be able to resolve the `hdfs_host` hostname.

      When using the IP, this is possible, but I believe that in preparation for the Flink run, Beam replaces the HDFS Host entry with "localhost" because that is what the IP resolves to on the Docker Host, which is then not resolvable by the Docker container.

      Users need to be able to explicitly set the HDFS Host parameter with respect to the Docker environment that the FlinkRunner executes the pipeline in, regardless of if the Host can resolve that hostname. In some cases, this could be another Docker container on the Docker network that is resolvable to the Docker container, but not to the Docker Host. Setting the "hdfs_host" to an IP Address should not result in replacement with "localhost".

      To summarize, running a Beam pipeline with the FlinkRunner (using the Docker environment), is unable to reach the Docker Host via networking, and therefor would be unable to connect to an HDFS Filesystem located on the Docker Host.


      Code example:


      HDFS_HOSTNAME = '' # Docker Host IP Address (macOS)
      HDFS_PORT = 9870
      hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user="apearce")
      hdfs_filesystem = HadoopFileSystem(hdfs_client_options)
      input_file_hdfs = "hdfs://user/apearce/testdata/001.csv"
      # This works
      # for x in hdfs_filesystem.open(input_file_hdfs).readlines():
      #    print(x)
      p = beam.Pipeline(options=PipelineOptions())
      def run(argv=None, save_main_session=True):
       config = {
       "runner": "FlinkRunner",
       "flink_master": "localhost:8081",
       "environment_type": "DOCKER",
       "save_main_session": True,
       "hdfs_host": HDFS_HOSTNAME,
       "hdfs_port": HDFS_PORT,
       "hdfs_user": "apearce",
       pipeline_options = PipelineOptions.from_dictionary(config)
       with beam.Pipeline(options=pipeline_options) as p:
           | 'ReadFromHDFS' >> beam.io.ReadFromText(input_file_hdfs)
           | 'Print' >> beam.Map(print)
      if __name__ == '__main__':






            Unassigned Unassigned
            adamhp Adam Pearce
            0 Vote for this issue
            1 Start watching this issue