Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30168

PyFlink Deserialization Error with Object Array

    XMLWordPrintableJSON

Details

    Description

      When it is attempted to collect object array records from a DataStream in PyFlink, an exception like follows would be thrown

      data = 0, field_type = DenseVectorTypeInfo
      def pickled_bytes_to_python_converter(data, field_type):if isinstance(field_type, RowTypeInfo):
      row_kind = RowKind(int.from_bytes(data[0], 'little'))
      data = zip(list(data[1:]), field_type.get_field_types())
      fields = []for d, d_type in data:
      fields.append(pickled_bytes_to_python_converter(d, d_type))
      row = Row.of_kind(row_kind, *fields)return rowelse:
      > data = pickle.loads(data)
      E TypeError: a bytes-like object is required, not 'int'

      I found that this error is invoked because PyFlink deals with object arrays differently on Java side and Python side. 

       

      On Java side (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)

      ...
      else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof PrimitiveArrayTypeInfo) {
      # recursively deal with array elements
      } ...
      else {
      # ObjectArrayTypeInfo is here
      TypeSerializer serializer = dataType.createSerializer(null); ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); return pickler.dumps(baos.toByteArray());
      }
      

       

      On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)

      ...
      elif isinstance(field_type,
      (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
        element_type = field_type._element_type
        elements = []
        for element_bytes in data:
          elements.append(pickled_bytes_to_python_converter(element_bytes, element_type))
        return elements

       

      Thus a possible fix for this bug is to align PyFlink's behavior on Java side and Python side.

      Attachments

        Issue Links

          Activity

            People

              dianfu Dian Fu
              yunfengzhou Yunfeng Zhou
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: