Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13285 Check connectors runnable in blink runner
  3. FLINK-13341

Connector Sinks should implement consumeDataStream instead of emitDataStream

    XMLWordPrintableJSON

Details

    Description

      Some streamTableSink#consumeDataStream(DataStream) don't be implemented as returning the sink transformation DataStreamSink when using blink planner.

      which will throw the following errors:

      Exception in thread "main" org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.streaming.connectors.kafka.Kafka010TableSink doesn't implement this method.
       at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
       at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
       at org.apache.flink.table.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:60)
       at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
       at org.apache.flink.table.planner.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:71)
       at org.apache.flink.table.planner.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
       at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
       at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
       at org.apache.flink.table.planner.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
       at org.apache.flink.table.planner.PlannerBase.translate(PlannerBase.scala:155)
       at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:446)
       at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:355)
       at org.apache.flink.table.examples.java.StreamSQLLookupJoinExample.main(StreamSQLLookupJoinExample.java:139)

      Attachments

        Issue Links

          Activity

            People

              kakachen Chen Qi
              kakachen Chen Qi
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m