Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17959

Exception: "CANCELLED: call already cancelled" is thrown when run python udf

    XMLWordPrintableJSON

    Details

      Description

      The exception is thrown when running Python UDF:

      May 27, 2020 3:20:49 PM org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor run
      SEVERE: Exception while executing runnable org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed@3960b30e
      org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:366)
      	at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onError(GrpcStateService.java:145)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

      The job can output the right results however it seems something goes wrong during the shutdown procedure.

      You can reproduce the exception with the following code(note: the exception happens occasionally):

      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.table import StreamTableEnvironment, DataTypes
      from pyflink.table.descriptors import Schema, OldCsv, FileSystem
      from pyflink.table.udf import udf
      
      env = StreamExecutionEnvironment.get_execution_environment()
      env.set_parallelism(1)
      t_env = StreamTableEnvironment.create(env)
      
      add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
      
      t_env.register_function("add", add)
      
      t_env.connect(FileSystem().path('/tmp/input')) \
          .with_format(OldCsv()
                       .field('a', DataTypes.BIGINT())
                       .field('b', DataTypes.BIGINT())) \
          .with_schema(Schema()
                       .field('a', DataTypes.BIGINT())
                       .field('b', DataTypes.BIGINT())) \
          .create_temporary_table('mySource')
      
      t_env.connect(FileSystem().path('/tmp/output')) \
          .with_format(OldCsv()
                       .field('sum', DataTypes.BIGINT())) \
          .with_schema(Schema()
                       .field('sum', DataTypes.BIGINT())) \
          .create_temporary_table('mySink')
      
      t_env.from_path('mySource')\
          .select("add(a, b)") \
          .insert_into('mySink')
      
      t_env.execute("tutorial_job")
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                dian.fu Dian Fu
                Reporter:
                hequn8128 Hequn Cheng
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: