Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Cannot Reproduce
-
2.4.0
-
None
-
Ubuntu 16.04.4 LTS
Description
Dear sir,
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.2.1 (local mode and standalone mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: https://github.com/LinRick/beamkafkaIO,
The description of my situation is as:
The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip (http://ubuntu7:9092).
The user manual of kafkaIO SDK (on web:https://beam.apache.org/documentation/sdks/javadoc/2.4.0/) indicates that the following parameters need to be set, and then the kafkaIO can work well.
.withBootstrapServers("kafka broker ip:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown. Here, as mentioned on the website: https://beam.apache.org/documentation/runners/spark/, the performing program will automatically set streaming mode.
Unfortunately, it failed for my program.
On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing).
The steps of performing StarterPipeline.java in my program are:
step1 mvn compile exec:java Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="-runner=SparkRunner"
step2 mvn clean package
step3 cp -rf target/beamkafkaIO-0.1.jar /root/
step4 cd /spark-2.2.1-bin-hadoop2.6/bin
step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner
I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ?
I really can't handle it, so I hope to get help from you.
if any further information is needed, i am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to deal with this issue.
i am looking forward to hearing from you.
Sincerely yours,
Rick