Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12848

apache_beam.io.external.kafka.ReadFromKafka throws IndexError

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • beam-model
    • None

    Description

      Kafka.ReadFromKafka throws IndexError: tuple index out of range due to unimplemented "_get_named_tuple_instance" function  of class SchemaBasedPayloadBuilder(PayloadBuilder):

       

       

      Stacktrace:

      Traceback (most recent call last):
      File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
      return _run_code(code, main_globals, None,
      File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
      exec(code, run_globals)
      File "/code/src/beam_example/beamKafkaRedis.py", line 36, in <module>
      notifications = pipeline | "Reading messages from Kafka" >> kafka.ReadFromKafka(
      File "/usr/local/lib/python3.9/dist-packages/apache_beam/io/kafka.py", line 166, in _init_
      super(ReadFromKafka, self)._init_(
      File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 217, in _init_
      payload.payload() if isinstance(payload, PayloadBuilder) else payload)
      File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 93, in payload
      return self.build().SerializeToString()
      File "/usr/local/lib/python3.9/dist-packages/apache_beam/transforms/external.py", line 106, in build
      schema = named_tuple_to_schema(type(row))
      File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 276, in named_tuple_to_schema
      return typing_to_runner_api(named_tuple).row_type.schema
      File "/usr/local/lib/python3.9/dist-packages/apache_beam/typehints/schemas.py", line 184, in typing_to_runner_api
      element_type = typing_to_runner_api(get_args(type)[0])
      IndexError: tuple index out of range
      args: ['--runner=PortableRunner', '--streaming', '--sdk_worker_parallelism=2', '--job_name=beam-readKafkaTopic', '--environment_type=PROCESS', '--environment_config=\{"command": "/opt/apache/beam/boot"}', '--job_name=beam-kafkaConnect', '--job_endpoint=localhost:39295']

      at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram(FlinkPortableClientEntryPoint.java:192) ~[?:?]
      at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(FlinkPortableClientEntryPoint.java:100) ~[?:?]
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
      at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
      at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
      at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.2.jar:1.13.2]

      Attachments

        Activity

          People

            Unassigned Unassigned
            Harsh_99 Harshvardhan
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: