Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33290

Custom counters to capture encoding/decoding failure in flink

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Runtime / Metrics
    • None

    Description

      I need to get the difference between records that are collected by the source and the records that are emitted. (To capture deserialization failure)

      Similarly, the difference between the records that have been received by sink and the records sent out of the sink. (To capture serialization failures)

      For e.g. - If deserialization fails while reading records from kafka, in that case, I want to expose the difference between records collected from Kafka Broker and records emitted from Kafka operator after deserialization as a metric.

      But I think flink does not provide any such metrics.

      In Kafka Source I can have a workaround to get this metric:

      I can override the open method from KafkaRecordDeserializationSchema where a metric can be added to show decoding failures:

          @Override
          public void open(DeserializationSchema.InitializationContext context) throws Exception {
          context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>()
              {
                  @Override
                  public Integer getValue()
                 

      {                 return decodingFailures;             }

              });
          }

      and at the time of deserialization I can increment that counter as below:

          @Override
          public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out)
          {
              try
             

      {             //deserialize         }

              catch (IOException | MMException e)
             

      {             logger.error(String.format("Error received while decoding, in partition [%d] for topic [%s] at offset [%d]: %s",                             partition, topic, offset, e.toString()));                         decodingFailures++; }

      But there is no such way to implement this in FileSource, as SimpleStreamFormat/Reader does not provide access to Context in any way.

      Similarly, I did not find any way to expose serialization related metrics in any of the sinks as well.

      Would it be possible to provide a way to implement custom counters to count serialization/deserialization failures in all Flink connectors (sinks & sources)?

      Attachments

        Activity

          People

            Unassigned Unassigned
            chiggi_dev Chirag Dewan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: