Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19332

Special characters issue using Kinesis Data Analytics for Apache Flink

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Cannot Reproduce
    • Affects Version/s: 1.8.2
    • Fix Version/s: None
    • Component/s: Connectors / Kinesis
    • Labels:
      None

      Description

      Hi there,

       

      I am encountering one special character issue while using Kinesis Data Analytics for Apache Flink (KDA).

       

      Our KDA is built for processing data and outputting to a Kinesis stream. We have a lambda function that subscribes to the Kinesis stream and reads records from the Kinesis stream.

      The library in the KDA I am using is "org.apache.flink.streaming.connectors.kinesis".

       

      Our KDA is only outputting one single record to the Kinesis sink using "collector.collect()" for a single key (details will be found below)

      Most times, the record received by the Lambda looks perfectly good.

      However, occasionally, when two records are sent to the kinesis sink using "collector.collect()" at the same time, we noticed that those two records are combined somehow and there are some special characters in the record received by the Lambda function.

       

       

       

       

       

       

      Below are some technical details:

       

      The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys.

      ).returns(MatchedDataForAlarm.class)
              .keyBy(MatchedDataForAlarm::getStateKey)
              .connect(ruleBroadcastStream)
              .process(new MetricProcess())
              .addSink(kinesis);

       

       

      The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides the "processElement" function. It uses collector.collect() for outputs.

      @Override
      public void processElement(MatchedDataForAlarm input, ReadOnlyContext ctx,Collector<MatchedDataForAlarm> collector) throws Exception {

       

       

      We have our own AEMMatchedDataForAlarmSchemaSerialization which implements KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply converts a "MatchedDataForAlarm" object to String using Gson and then converts to ByteBuffer.

       

      @Override
      public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) {
          Gson gson = new Gson();
          String result = gson.toJson(matchedDataForAlarm);
          log.info("Alarm record sent to Kinesis stream: {}", result);
          return ByteBuffer.wrap(result.getBytes());
      }

       

       

       

      Here's the record shown in the Lambda logs when two records are combined somewhere somehow (most cases those two are received as two separate records):

       

      ????
      0??

      { "inAlarmState": false }

      ??

      { "inAlarmState": false}

      e??E??o?N9x

       

       

       

       

      I am not sure if it's a serialization issue or some default behaviors in the Kinesis sink library? It might be just some common mistakes that I made which I am not aware of.

      Could anyone help with this problem? I really appreciate it.

       

       

       

      Thanks,

      Zekun

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              zekuny Zekun Yu
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: