Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14327

Getting "Could not forward element to next operator" error

    XMLWordPrintableJSON

Details

    Description

      val TEMPERATURE_THRESHOLD: Double = 50.00

      val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

      val properties = new Properties()
      properties.setProperty("zookeeper.connect", "localhost:2181")
      properties.setProperty("bootstrap.servers", "localhost:9092")

      val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
      new JSONKeyValueDeserializationSchema(false), properties)).name("kafkaSource")
      case class Event(locationID: String, temp: Double)

      var data = src.map { v => {
      val loc = v.get("locationID").asInstanceOf[String]
      val temperature = v.get("temp").asDouble()
      (loc, temperature)
      }}

      data = data
      .keyBy(
      v => v._1
      )

      data.print()

      see.execute()

      --*********---

      And I'm getting the following error while consuming json file from Kafka:-

       
      Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.... at flinkBroadcast1$.main(flinkBroadcast1.scala:59) at flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator...Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator...Caused by: java.lang.NullPointerException

      Attachments

        1. so2.png
          149 kB
          ASK5

        Activity

          People

            Unassigned Unassigned
            ASK5 ASK5
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: