Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9493

Apache Beam/Dataflow flowed a CalledProcessError with beam.Pipeline("DataflowRunner", options=opts)

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.16.0
    • None
    • None
    • apache-beam==2.16.0
      tensorflow==2.1.0
      tensorflow-metadata==0.15.2
      tensorflow-transform==0.15.0
      Python 2.7.13
      pip 20.0.2
    • Hide
      I can run the following preprocessing in Apache Beam locally successfully. But when I run it on cloud (DataflowRunner), there is error.

      I know it may be because the pip download command failed. But I already upgrade the pip. I don't know what to do and I cannot find a solution.
      Show
      I can run the following preprocessing in Apache Beam locally successfully. But when I run it on cloud (DataflowRunner), there is error. I know it may be because the pip download command failed. But I already upgrade the pip. I don't know what to do and I cannot find a solution.

    Description

      def preprocess(in_test_mode):
      import os
      import os.path
      import tempfile
      from apache_beam.io import tfrecordio
      from tensorflow_transform.coders import example_proto_coder
      from tensorflow_transform.tf_metadata import dataset_metadata
      from tensorflow_transform.tf_metadata import dataset_schema
      from tensorflow_transform.beam import tft_beam_io
      from tensorflow_transform.beam.tft_beam_io import transform_fn_io

      job_name = 'preprocess-bike-features' + '' + datetime.datetime.now().strftime('%y%m%d%H%M%S')
      if in_test_mode:
      import shutil
      print('Launching local job ... hang on')
      OUTPUT_DIR = './bike_preproc_tft'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      EVERY_N = 5
      else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://

      {0}

      /bike_preproc_tft/'.format(BUCKET)
      import subprocess
      subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
      EVERY_N = 5

      options =

      { 'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'), 'temp_location': os.path.join(OUTPUT_DIR, 'tmp'), 'job_name': job_name, 'project': PROJECT, 'max_num_workers': 6, 'teardown_policy': 'TEARDOWN_ALWAYS', 'no_save_main_session': True, 'requirements_file': 'requirements.txt' }

      opts = beam.pipeline.PipelineOptions(flags=[], **options)
      if in_test_mode:
      RUNNER = 'DirectRunner'
      else:
      RUNNER = 'DataflowRunner'

      1. set up raw data metadata
        raw_data_schema = { colname : dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation()) for colname in 'start_year,start_month,start_day,start_station_id,hire_count'.split(',') }

        raw_data_schema.update(

        { 'day_of_week' : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation()) }

        )

      raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))

      1. run Beam
        with beam.Pipeline(RUNNER, options=opts) as p:
        with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
      2. save the raw data metadata
        raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
        os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
        pipeline=p)
      1. read training data from bigquery and filter rows
        raw_data = (p
        'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N), use_standard_sql=True))
        'train_filter' >> beam.Filter(is_valid))
        raw_dataset = (raw_data, raw_data_metadata)
      1. analyze and transform training data
        transformed_dataset, transform_fn = (
        raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
        transformed_data, transformed_metadata = transformed_dataset
      1. save transformed training data to disk in efficient tfrecord format
        transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
        os.path.join(OUTPUT_DIR, 'train'),
        file_name_suffix='.gz',
        coder=example_proto_coder.ExampleProtoCoder(
        transformed_metadata.schema))
      1. read eval data from bigquery and filter rows
        raw_test_data = (p
        'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N), use_standard_sql=True))
        'eval_filter' >> beam.Filter(is_valid))
        raw_test_dataset = (raw_test_data, raw_data_metadata)
      1. transform eval data
        transformed_test_dataset = (
        (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
        transformed_test_data, _ = transformed_test_dataset
      1. save transformed training data to disk in efficient tfrecord format
        transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
        os.path.join(OUTPUT_DIR, 'eval'),
        file_name_suffix='.gz',
        coder=example_proto_coder.ExampleProtoCoder(
        transformed_metadata.schema))
      1. save transformation function to disk for use at serving time
        transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
        os.path.join(OUTPUT_DIR, 'metadata')) 

       
      When I ran the above preprocessing function with RUNNER='DATAFLOWRUNNER', the following error pops up.
       
      ---------------------------------------------------------------------------
      CalledProcessError Traceback (most recent call last)
      ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs)
      82 try:
      ---> 83 out = subprocess.check_output(*args, **kwargs)
      84 except OSError:

      /usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, **kwargs)
      315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
      --> 316 **kwargs).stdout
      317

      /usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, **kwargs)
      397 raise CalledProcessError(retcode, process.args,
      --> 398 output=stdout, stderr=stderr)
      399 return CompletedProcess(process.args, retcode, stdout, stderr)

      CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1

      During handling of the above exception, another exception occurred:

      RuntimeError Traceback (most recent call last)
      <ipython-input-13-eac0bb8c8400> in <module>
      131 os.path.join(OUTPUT_DIR, 'metadata'))
      132
      --> 133 preprocess(in_test_mode=False) # change to True to run locally

      <ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode)
      129 # save transformation function to disk for use at serving time
      130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
      --> 131 os.path.join(OUTPUT_DIR, 'metadata'))
      132
      133 preprocess(in_test_mode=False) # change to True to run locally

      ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in _exit_(self, exc_type, exc_val, exc_tb)
      425 def _exit_(self, exc_type, exc_val, exc_tb):
      426 if not exc_type:
      --> 427 self.run().wait_until_finish()
      428
      429 def visit(self, visitor):

      ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
      405 self.to_runner_api(use_fake_coders=True),
      406 self.runner,
      --> 407 self._options).run(False)
      408
      409 if self._options.view_as(TypeOptions).runtime_type_check:

      ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
      418 finally:
      419 shutil.rmtree(tmpdir)
      --> 420 return self.runner.run_pipeline(self, self._options)
      421
      422 def _enter_(self):

      ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options)
      483 # raise an exception.
      484 result = DataflowPipelineResult(
      --> 485 self.dataflow_client.create_job(self.job), self)
      486
      487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.

      ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
      204 while True:
      205 try:
      --> 206 return fun(*args, **kwargs)
      207 except Exception as exn: # pylint: disable=broad-except
      208 if not retry_filter(exn):

      ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job(self, job)
      529 def create_job(self, job):
      530 """Creates job description. May stage and/or submit for remote execution."""
      --> 531 self.create_job_description(job)
      532
      533 # Stage and submit the job when necessary

      ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job_description(self, job)
      559
      560 # Stage other resources for the SDK harness
      --> 561 resources = self._stage_resources(job.options)
      562
      563 job.proto.environment = Environment(

      ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in _stage_resources(self, options)
      489 options,
      490 temp_dir=tempfile.mkdtemp(),
      --> 491 staging_location=google_cloud_options.staging_location)
      492 return resources
      493

      ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in stage_job_resources(self, options, build_setup_args, temp_dir, populate_requirements_cache, staging_location)
      166 (populate_requirements_cache if populate_requirements_cache else
      167 Stager._populate_requirements_cache)(setup_options.requirements_file,
      --> 168 requirements_cache_path)
      169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
      170 self.stage_artifact(

      ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
      204 while True:
      205 try:
      --> 206 return fun(*args, **kwargs)
      207 except Exception as exn: # pylint: disable=broad-except
      208 if not retry_filter(exn):

      ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir)
      485 ]
      486 logging.info('Executing command: %s', cmd_args)
      --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT)
      488
      489 @staticmethod

      ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs)
      89 "Full traceback: {} \n Pip install failed for package: {} \
      90 \n Output from execution of subprocess: {}" \
      ---> 91 .format(traceback.format_exc(), args[0][6], error.output))
      92 else:
      93 raise RuntimeError("Full trace: {}, \

      RuntimeError: Full traceback: Traceback (most recent call last):
      File "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py", line 83, in check_output
      out = subprocess.check_output(*args, **kwargs)
      File "/usr/lib/python3.5/subprocess.py", line 316, in check_output
      **kwargs).stdout
      File "/usr/lib/python3.5/subprocess.py", line 398, in run
      output=stdout, stderr=stderr)
      subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1

      Pip install failed for package: -r
      Output from execution of subprocess: b"Collecting tensorflow-transform==0.15.0\n Using cached tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build dependencies: started\n Installing build dependencies: still running...\n Installing build dependencies: finished with status 'done'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status 'done'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status 'done'\n Saved /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->r requirements.txt (line 1)) (from versions: 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0>-r requirements.txt (line 1))\n"

      Attachments

        Activity

          People

            Unassigned Unassigned
            pyip PY
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: