Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11993

ReadFromKafka doesn’t send data to the next PTransform – Apache Flink "Cluster" – Apache Beam Python SDK

Details

    Description

      I am trying to build a streaming pipeline using Python. The pipeline should subscribe to a Kafka topic and process the data on the fly. I am using the following configuration:

      class PrintFn(beam.DoFn):
          def __init__(self, label):
              self.label = label    
          def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
              logging.info("[%s]: %s %s %s", self.label, element, window, timestamp)
              yield element
      
      [...]
      
      pipeline_args = [ 
       "--job_endpoint=localhost:8099", 
       "--runner=PortableRunner" , 
       "--environment_type=DOCKER", 
       "--environment_config=gcr.io/xxxx/beam_python3.7_sdk:v2.28.0-custom", 
       "--enable_streaming_engine"
      ] 
      pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True) 
      DataPipeline = beam.Pipeline(options=pipeline_options) 
      ReadData = ( 
         DataPipeline 
         | "ReadFromKafka" 
         >> ReadFromKafka( 
             consumer_config={ 
                       "bootstrap.servers": "10.0.1.40:9092", 
                       "auto.offset.reset":"latest" 
             }, 
             topics="beam_topic", 
             expansion_service="localhost:8097" 
            ) 
         | "Debug" 
         >> beam.ParDo(PrintFn(label="test")) 
      )
      

      and a Flink configuration with Job and Task managers. The pipeline is loaded, but as soon as it starts running, the task: 

      Source: Impulse -> [3]ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/{ParDo(OutputSingleSource), ParDo(UnboundedSourceAsSDFWrapper)}
      

      changes its status from RUNNING to FINISHED. The Kafka consumer remains subscribed and reports the following:

      2021-03-16 16:10:54,628 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of partition topic_beam-0 
      
      2021-03-16 16:10:54,629 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition topic_beam-0 to offset 144. 
      
      2021-03-16 16:10:55,628 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of partition topic_beam-0 
      
      2021-03-16 16:10:55,629 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition topic_beam-0 to offset 145.

      But it doesn’t send any data to the next task:

      [3]ReadFromKafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Debug
      

      which remains in RUNNING mode.

      Changing the configuration to:

      | "ReadFromKafka"
      >> ReadFromKafka( 
               consumer_config={ 
                    "bootstrap.servers": "10.0.1.40:9092", 
                    "auto.offset.reset":"earliest" 
               }, 
               topics="beam_topic", 
               max_num_records=10, 
               expansion_service="localhost:8097" 
      ) 
      | "Debug" 
       >> beam.ParDo(PrintFn(label="test")) 
      )
      

      seems to work but only for the X (in the code = 10) records that should be already available in the broker, and I get the info logging as expected:

      2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow Timestamp(1615910111.418000) 
       2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow Timestamp(1615910111.418000)
      

      After reading those messages, the complete pipeline (both mentioned tasks) changes its status to FINISHED (as expected).
       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              lemariva Mauro Riva
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: