Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22608

Flink Kryo deserialize read wrong bytes

    XMLWordPrintableJSON

    Details

      Description

      In flink program, I use ValueState to save my state. The state stores pojo. But my pojo used kryo serializer. As the program run some time, I add a field in pojo. Then recovery the program with checkpoint. I found the value of the field incorrect. Then I read the source code I found

       

      //代码占位符
      org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData
      
      private void readStateHandleStateData(
         FSDataInputStream fsDataInputStream,
         DataInputViewStreamWrapper inView,
         KeyGroupRangeOffsets keyGroupOffsets,
         Map<Integer, StateMetaInfoSnapshot> kvStatesById,
         int numStates,
         int readVersion,
         boolean isCompressed) throws IOException {
      
         final StreamCompressionDecorator streamCompressionDecorator = isCompressed ?
            SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
      
         for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
            int keyGroupIndex = groupOffset.f0;
            long offset = groupOffset.f1;
      
            // Check that restored key groups all belong to the backend.
            Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend.");
      
            fsDataInputStream.seek(offset);
      
            int writtenKeyGroupIndex = inView.readInt();
            Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
               "Unexpected key-group in restore.");
      
            try (InputStream kgCompressionInStream =
                   streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
      
               readKeyGroupStateData(
                  kgCompressionInStream,
                  kvStatesById,
                  keyGroupIndex,
                  numStates,
                  readVersion);
            }
         }
      }
      

      my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I add new field in pojo. Kryo will read more bytes in the next keyGroup.

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Autumn Si Chen
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: