Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12910

A ready docker image for running Beam with Kafka

Details

    • Wish
    • Status: Resolved
    • P3
    • Resolution: Won't Do
    • None
    • Missing
    • examples-python
    • slim python docker image

    Description

      I have hit an issue that either shows that I am doing something wrong, or I haven't found a good docker image.

      I am trying to run such a piece of Python 3.8 code:

       

      import apache_beam as beam

      from apache_beam.options.pipeline_options{{ import PipelineOptions}}

      def run():
          options = PipelineOptions(["--runner=DirectRunner"])

          with beam.Pipeline(options=options) as p:
              p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x-1) | beam.Map(print)

       

      if _name_ == "_main_":
          run()}}

       

      Running it locally works fine. However I noticed that on runtime the beam part downloads and runs a docker container. So running Beam with Kafka in Docker container requires adding Java and Docker (running docker inside docker container) :

      Dockerfile (simplified): 

      FROM python:3.8.12-slim
      {{ RUN apt update}}
      {{ RUN apt install -y wget curl unzip git}}COPY ./ /root/data_collector/
      {{ WORKDIR /root/data_collector}}
      {{ RUN python3 -m pip install --upgrade pip}}
      {{ RUN python3 -m pip install -r beam/requirements/requirements.txt}}
      {{ CMD python3 beam/data_aggregation.py}}

       

      Running docker:

       

      % docker run --network=kafka --name=beam beam
      INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      INFO:apache_beam.utils.subprocess_server:Downloading job server jar from
      https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.25.0/beam-sdks-java-io-expansion-service-2.25.0.jar 

      INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.25.0.jar' '54923']
      INFO:apache_beam.utils.subprocess_server:b'Starting expansion service at localhost:54923'
      INFO:apache_beam.utils.subprocess_server:b'Sep 14, 2021 8:41:19 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms'
      INFO:apache_beam.utils.subprocess_server:b'INFO: Registering external transforms: [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, beam:external:java:generate_sequence:v1]'
      INFO:apache_beam.utils.subprocess_server:b'\tbeam:external:java:kafka:read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$1/463345942@53bd815b'
      INFO:apache_beam.utils.subprocess_server:b'\tbeam:external:java:kafka:write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$1/463345942@2401f4c3'
      INFO:apache_beam.utils.subprocess_server:b'\tbeam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$1/463345942@7637f22'
      INFO:apache_beam.utils.subprocess_server:b'Sep 14, 2021 8:41:22 PM org.apache.beam.sdk.expansion.service.ExpansionService expand'
      INFO:apache_beam.utils.subprocess_server:b"INFO: Expanding 'Read Kafka Messages' with URN 'beam:external:java:kafka:read:v1'"
      INFO:apache_beam.utils.subprocess_server:b'Sep 14, 2021 8:41:22 PM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig'
      INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration class 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no schema registered. Attempting to construct with setter approach."
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x7f119995d9d0> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x7f119995daf0> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function eliminate_common_key_with_none at 0x7f119995dc10> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x7f119995dca0> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7f119995dd30> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x7f119995ddc0> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x7f119995de50> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x7f119995df70> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x7f119995e040> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x7f119995e0d0> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x7f119995e160> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7f119995e3a0> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x7f119995e310> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x7f119995e430> ====================
      INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting control server on port 35259
      INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting data server on port 46011
      INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting state server on port 41139
      INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting logging server on port 38759
      INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.DockerSdkWorkerHandler object at 0x7f11995af670> for environment external_1beam:env:docker:v1 (beam:env:docker:v1, b'\n\x1bapache/beam_java_sdk:2.25.0')
      Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
      INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Unable to pull image apache/beam_java_sdk:2.25.0
      docker: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?.
      See 'docker run --help'.
       

      So I then tried the  apache/beam_python3.8_sdk:latest image, but this led me to even more problems: https://stackoverflow.com/questions/69195731/dockerized-apache-beam-returns-no-id-provided

      My wish is to put a simple HOWTO instructions somewhere together with a docker image for running Beam with Kafka.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jakubczaplicki Jakub Czaplicki
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: