Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Invalid
-
1.9.0
-
None
Description
val TEMPERATURE_THRESHOLD: Double = 50.00
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("bootstrap.servers", "localhost:9092")
val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
new JSONKeyValueDeserializationSchema(false), properties)).name("kafkaSource")
case class Event(locationID: String, temp: Double)
var data = src.map { v => {
val loc = v.get("locationID").asInstanceOf[String]
val temperature = v.get("temp").asDouble()
(loc, temperature)
}}
data = data
.keyBy(
v => v._1
)
data.print()
see.execute()
--*********---
And I'm getting the following error while consuming json file from Kafka:-
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.... at flinkBroadcast1$.main(flinkBroadcast1.scala:59) at flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator...Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator...Caused by: java.lang.NullPointerException