Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33128

TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      When using the TestValues connector with nested Row values relying on BinaryArrayWriter the following exception happen : 

      java.lang.NullPointerException: Cannot invoke "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" because "this.reuseWriter" is null
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
          at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
          at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
          at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
          at LookupFunction$370.open(Unknown Source)
          at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
          at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
          at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
          at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
          at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
          at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
          at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

       

      This is happening because open() is being not called from TestValuesLookupFunction.open() and the underlying converter writer never gets initialized.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jgagnon1 Jerome Gagnon
            jgagnon1 Jerome Gagnon
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment