Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Invalid
-
1.7.2
-
None
-
None
-
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformationobject Run {
def main(args: Array[String]): Unit =
{ val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val table = tableEnv.fromDataSet(dataSet) tableEnv.registerTable("user1",table) val csvTableSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE) val fieldNames = Array("id", "name", "value") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) tableEnv.registerTableSink("csvTableSink",fieldNames,fieldTypes,csvTableSink) tableEnv.scan("user1") env.execute() }}
package com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object Run { def main(args: Array [String] ): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) ) val table = tableEnv.fromDataSet(dataSet) tableEnv.registerTable("user1",table) val csvTableSink = new CsvTableSink("sink-data/csv/a.csv",",",1,WriteMode.OVERWRITE) val fieldNames = Array("id", "name", "value") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) tableEnv.registerTableSink("csvTableSink",fieldNames,fieldTypes,csvTableSink) tableEnv.scan("user1") env.execute() } }
Description
Exception in thread "main" java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:945)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:923)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:525)
at com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset.Run$.main(Run.scala:36)
at com.opensourceteams.module.bigdata.flink.example.tableapi.convert.dataset.Run.main(Run.scala)
Process finished with exit code 1