Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18049

The Flink kafka consumer job will be interrupted if the upstream kafka producer change the AVRO schema

    XMLWordPrintableJSON

Details

    Description

      We have encountered a critical case from online services. we have the data pipeline: (producer) -> (kafka) -> (flink consumer job), and all those records are encoded in AVRO format. Once the producer changed the AVRO schema , says adding an extra column to the existing schema and writing few data into the Kafka.
      Then the downstream flink job crashed with the following stacktrace:

      ==WARNING==  allocating large array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
      os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable 
        at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
        at org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
        at org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
        at org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
        at org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
        at org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
        at org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
        at org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
        at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
        at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
        at java.lang.Thread.run(Thread.java:834)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              openinx Zheng Hu
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated: