Details

    Description

      Currently, object reuse must be enabled in order to use chained sources.

      Tests such as `HiveDialectQueryITCase` will fail with an exception:

      2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR] testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase)  Time elapsed: 12.283 s  <<< ERROR!
      2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException: Failed to fetch next result
      2021-07-12T14:47:55.8235133Z Jul 12 14:47:55 	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
      2021-07-12T14:47:55.8235958Z Jul 12 14:47:55 	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      2021-07-12T14:47:55.8236774Z Jul 12 14:47:55 	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
      ....
      2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by: java.lang.UnsupportedOperationException: Currently chained sources are supported only with objectReuse enabled
      2021-07-12T14:47:55.8314356Z Jul 12 14:47:55 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355)
      2021-07-12T14:47:55.8315109Z Jul 12 14:47:55 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322)
      2021-07-12T14:47:55.8315820Z Jul 12 14:47:55 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:220)
      2021-07-12T14:47:55.8316506Z Jul 12 14:47:55 	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558)
      2021-07-12T14:47:55.8317209Z Jul 12 14:47:55 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661)
      2021-07-12T14:47:55.8317948Z Jul 12 14:47:55 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547)
      2021-07-12T14:47:55.8318626Z Jul 12 14:47:55 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
      2021-07-12T14:47:55.8319205Z Jul 12 14:47:55 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
      2021-07-12T14:47:55.8319725Z Jul 12 14:47:55 	at java.lang.Thread.run(Thread.java:748)
      2021-07-12T14:47:55.8320122Z Jul 12 1
      

      The fix should looks as follows:

      This particular exception should be rather straightforward to fix. The reason it's not implemented is because the chaining sources feature was implemented in the minimal scope required by blink planner and is missing around ~50-100 lines of production code to work with the object reuse disabled.
      In the OperatorChain#createChainedSourceOutput we need to something similar as is done in OperatorChain#wrapOperatorIntoOutput , so something like:

              if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                  return closer.register(new ChainingOutput(input, metricGroup, outputTag));
              } else {
                  TypeSerializer<IN> inSerializer =
                          operatorConfig.getTypeSerializerIn1(userCodeClassloader);
                  return closer.register(new CopyingChainingOutput(input, inSerializer, metricGroup, outputTag));
              }
      

      the missing part to do that is to make CopyingChainingOutput work with an Input instead of an Operator.

      Attachments

        Issue Links

          Activity

            People

              twalthr Timo Walther
              twalthr Timo Walther
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: