Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.16.0, 1.15.2
Description
When it is attempted to collect object array records from a DataStream in PyFlink, an exception like follows would be thrown
data = 0, field_type = DenseVectorTypeInfo def pickled_bytes_to_python_converter(data, field_type):if isinstance(field_type, RowTypeInfo): row_kind = RowKind(int.from_bytes(data[0], 'little')) data = zip(list(data[1:]), field_type.get_field_types()) fields = []for d, d_type in data: fields.append(pickled_bytes_to_python_converter(d, d_type)) row = Row.of_kind(row_kind, *fields)return rowelse: > data = pickle.loads(data) E TypeError: a bytes-like object is required, not 'int'
I found that this error is invoked because PyFlink deals with object arrays differently on Java side and Python side.
On Java side (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)
... else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof PrimitiveArrayTypeInfo) { # recursively deal with array elements } ... else { # ObjectArrayTypeInfo is here TypeSerializer serializer = dataType.createSerializer(null); ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); return pickler.dumps(baos.toByteArray()); }
On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
... elif isinstance(field_type, (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)): element_type = field_type._element_type elements = [] for element_bytes in data: elements.append(pickled_bytes_to_python_converter(element_bytes, element_type)) return elements
Thus a possible fix for this bug is to align PyFlink's behavior on Java side and Python side.
Attachments
Issue Links
- links to