Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33549

Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode

    XMLWordPrintableJSON

Details

    Description

      When run a job in batch, it throws the following exception

      java.lang.NullPointerException: Factory does not implement interface org.apache.flink.streaming.api.operators.YieldingOperatorFactory
          at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
          at org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
          at org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
          at org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
          at org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.<init>(MultipleInputStreamOperatorBase.java:88)
          at org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.<init>(BatchMultipleInputStreamOperator.java:48)
          at org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
          at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:212)
          at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
          at java.lang.Thread.run(Thread.java:834) 

       

      When I disable multiple-input by setting table.optimizer.multiple-input-enabled = false, it works then. 

      Should be introduced by FLINK-23621.

      In here, when the operator factory is instanceof YieldingOperatorFactory, it will set mailbox executor. But when it's 
      BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox executor but it won't set the mailbox executor. for the operators wrapped by the BatchMultipleInputStreamOperator. Then the exception is thrown.
       

      Attachments

        Issue Links

          Activity

            People

              xu_shuai_ Shuai Xu
              luoyuxia luoyuxia
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: