Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33934

Flink SQL Source use raw format maybe lead to data lost

    XMLWordPrintableJSON

Details

    Description

      In our product we encounter a case that lead to data lost, the job info: 
         1. using flinkSQL that read data from messageQueue (our internal mq) and write to hive (only select value field, doesn't contain metadata field)
         2. the format of source table is raw format
       
      But if we select value field and metadata field at the same time, than the data lost will not appear
       
      After we review the code, we found that the reason is the object reuse of Raw-format(see code RawFormatDeserializationSchema), why object reuse will lead to this problem is below (take kafka as example):
          1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code SourceOperator FetcherTask )
          2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code SourceOperator main thread)
          3. For RawFormatDeserializationSchema, its deserialize function will return the same object(reuse rowData object)
          4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost;
       
      The reason that we select value and metadata field at the same time will not encounter data lost is:
         if we select metadata field there will return a new RowData object see code: DynamicKafkaDeserializationSchema deserialize with metadata field and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code DynamicKafkaDeserializationSchema deserialize only with value field
       
      To solve this problem, i think we should remove reuse object of RawFormatDeserializationSchema.

      Attachments

        Issue Links

          Activity

            People

              catyee Yuan Kui
              cailiuyang Cai Liuyang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: