Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21876

Handle it properly when the returned value of Python UDF doesn't match the defined result type

    XMLWordPrintableJSON

    Details

      Description

      Currently, when the returned value of Python UDF doesn't match the defined result type of the Python UDF, it will thrown the following exception during execution:

      Caused by: java.io.EOFException
      at java.io.DataInputStream.readFully(DataInputStream.java:197)
      at java.io.DataInputStream.readFully(DataInputStream.java:169)
      at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
      at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
      at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
      at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
      at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
      at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
      at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
      at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
      at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
      at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
      at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
      at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
      at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
      at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
      at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
      at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
      at java.lang.Thread.run(Thread.java:748)
      

      The exception isn't straight forward for users and it's difficult for users to figure out the root cause of the issue.

      As Python is dynamic language, this case should be very common and it would be great if we could handle this case properly.

      See https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully for more details.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              dian.fu Dian Fu
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: