Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22608

Flink Kryo deserialize read wrong bytes

    XMLWordPrintableJSON

Details

    Description

      In flink program, I use ValueState to save my state. The state stores pojo. But my pojo used kryo serializer. As the program run some time, I add a field in pojo. Then recovery the program with checkpoint. I found the value of the field incorrect. Then I read the source code I found

       

      //代码占位符
      org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData
      
      private void readStateHandleStateData(
         FSDataInputStream fsDataInputStream,
         DataInputViewStreamWrapper inView,
         KeyGroupRangeOffsets keyGroupOffsets,
         Map<Integer, StateMetaInfoSnapshot> kvStatesById,
         int numStates,
         int readVersion,
         boolean isCompressed) throws IOException {
      
         final StreamCompressionDecorator streamCompressionDecorator = isCompressed ?
            SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
      
         for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
            int keyGroupIndex = groupOffset.f0;
            long offset = groupOffset.f1;
      
            // Check that restored key groups all belong to the backend.
            Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend.");
      
            fsDataInputStream.seek(offset);
      
            int writtenKeyGroupIndex = inView.readInt();
            Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
               "Unexpected key-group in restore.");
      
            try (InputStream kgCompressionInStream =
                   streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
      
               readKeyGroupStateData(
                  kgCompressionInStream,
                  kvStatesById,
                  keyGroupIndex,
                  numStates,
                  readVersion);
            }
         }
      }
      

      my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I add new field in pojo. Kryo will read more bytes in the next keyGroup.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Autumn Si Chen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: