Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27348

Flink KafkaSource doesn't set groupId

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Done
    • 1.14.4
    • None
    • API / Scala
    • None

    Description

      I have one very simple Flink application. I have installed kafka in my local and I am reading data from kafka with flink. I am using KafkaSource class in Flink. Although I have assigned GroupId with setGroupId, this groupId does not appear in Kafka.

       

      object FlinkKafkaSource extends App {
        val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
        case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, userId:String, firstName:String)
        implicit val readsEvent: Reads[Event] = Json.reads[Event]
      
        env
          .fromSource(KafkaSource.builder[Event]
            .setBootstrapServers("localhost:9092")
            .setTopics("flink-connection")
            .setGroupId("test-group") // I can't see this groupId in kafka-consumer-groups
            .setStartingOffsets(OffsetsInitializer.latest)
            .setDeserializer(new KafkaRecordDeserializationSchema[Event] {
              override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Event]): Unit = {
                val rec = record.value.map(_.toChar).mkString
                Try(Json.fromJson[Event](Json.parse(rec)).get) match {
                  case Success(event) => out.collect(event)
                  case Failure(exception) => println(s"Couldn't parse string: $rec, error: ${exception.toString}")
                }
              }
              override def getProducedType: TypeInformation[Event] = createTypeInformation[Event]
            })
            .build,
            WatermarkStrategy.noWatermarks[Event],
            "kafka-source"
          )
          .keyBy(l => l.userId)
          .print
      
        env.execute("flink-kafka-source")
      } 

      I have created a topic in kafka named "flink-connection".

       

      I am using a simple kafka-python producer to produce data flink-connection topic.

      I am able to consume data from kafka to flink.

      But can't see the groupId in kafka-consumer-groups

      Does anyone has any idea why groupid is not setting?

       

      Attachments

        1. image-2022-04-22-05-43-06-475.png
          165 kB
          Ahmet Gürbüz
        2. image-2022-04-22-05-44-56-494.png
          61 kB
          Ahmet Gürbüz
        3. image-2022-04-22-05-46-45-592.png
          11 kB
          Ahmet Gürbüz
        4. image-2022-04-22-05-52-04-760.png
          231 kB
          Ahmet Gürbüz

        Activity

          People

            Unassigned Unassigned
            gurbux Ahmet Gürbüz
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: