Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33128

TestValuesRuntimeFunctions$TestValuesLookupFunction does not call open() on converter

    XMLWordPrintableJSON

Details

    Description

      When using the TestValues connector with nested Row values relying on BinaryArrayWriter the following exception happen : 

      java.lang.NullPointerException: Cannot invoke "org.apache.flink.table.data.writer.BinaryArrayWriter.getNumElements()" because "this.reuseWriter" is null
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.allocateWriter(ArrayObjectArrayConverter.java:140)
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toBinaryArrayData(ArrayObjectArrayConverter.java:114)
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:93)
          at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:90)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
          at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
          at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
          at org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
          at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.lambda$indexDataByKey$0(TestValuesRuntimeFunctions.java:626)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
          at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.indexDataByKey(TestValuesRuntimeFunctions.java:624)
          at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$TestValuesLookupFunction.open(TestValuesRuntimeFunctions.java:601)
          at LookupFunction$370.open(Unknown Source)
          at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
          at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.open(LookupJoinRunner.java:67)
          at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner.open(LookupJoinWithCalcRunner.java:51)
          at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
          at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
          at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
          at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

       

      This is happening because open() is being not called from TestValuesLookupFunction.open() and the underlying converter writer never gets initialized.

      Attachments

        Issue Links

          Activity

            People

              jgagnon1 Jerome Gagnon
              jgagnon1 Jerome Gagnon
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: