Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.26.0, 2.27.0, 2.28.0
-
None
Description
I am trying to build a streaming pipeline using Python. The pipeline should subscribe to a Kafka topic and process the data on the fly. I am using the following configuration:
class PrintFn(beam.DoFn): def __init__(self, label): self.label = label def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam): logging.info("[%s]: %s %s %s", self.label, element, window, timestamp) yield element [...] pipeline_args = [ "--job_endpoint=localhost:8099", "--runner=PortableRunner" , "--environment_type=DOCKER", "--environment_config=gcr.io/xxxx/beam_python3.7_sdk:v2.28.0-custom", "--enable_streaming_engine" ] pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True) DataPipeline = beam.Pipeline(options=pipeline_options) ReadData = ( DataPipeline | "ReadFromKafka" >> ReadFromKafka( consumer_config={ "bootstrap.servers": "10.0.1.40:9092", "auto.offset.reset":"latest" }, topics="beam_topic", expansion_service="localhost:8097" ) | "Debug" >> beam.ParDo(PrintFn(label="test")) )
and a Flink configuration with Job and Task managers. The pipeline is loaded, but as soon as it starts running, the task:
Source: Impulse -> [3]ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/{ParDo(OutputSingleSource), ParDo(UnboundedSourceAsSDFWrapper)}
changes its status from RUNNING to FINISHED. The Kafka consumer remains subscribed and reports the following:
2021-03-16 16:10:54,628 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of partition topic_beam-0 2021-03-16 16:10:54,629 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition topic_beam-0 to offset 144. 2021-03-16 16:10:55,628 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Seeking to LATEST offset of partition topic_beam-0 2021-03-16 16:10:55,629 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_538555605_none-3, groupId=Reader-0_offset_consumer_538555605_none] Resetting offset for partition topic_beam-0 to offset 145.
But it doesn’t send any data to the next task:
[3]ReadFromKafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Debug
which remains in RUNNING mode.
Changing the configuration to:
| "ReadFromKafka" >> ReadFromKafka( consumer_config={ "bootstrap.servers": "10.0.1.40:9092", "auto.offset.reset":"earliest" }, topics="beam_topic", max_num_records=10, expansion_service="localhost:8097" ) | "Debug" >> beam.ParDo(PrintFn(label="test")) )
seems to work but only for the X (in the code = 10) records that should be already available in the broker, and I get the info logging as expected:
2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow Timestamp(1615910111.418000) 2021-03-16 15:55:11,665 INFO apachebeam_pipeline.py:164 [] - [test]: (b'beam', b'{"type":"Buffer","data":[0,0,0,0,1,28,102]}') GlobalWindow Timestamp(1615910111.418000)
After reading those messages, the complete pipeline (both mentioned tasks) changes its status to FINISHED (as expected).
Attachments
Issue Links
- is duplicated by
-
BEAM-11991 Python Kafka source not emitting messages for streaming pipelines with Flink Runner
- Resolved