Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Invalid
-
None
-
None
Description
def collect[T]( tEnv: TableEnvironment, table: Table, sink: CollectTableSink[T], jobName: Option[String]): Seq[T] = { // jobName was not used val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) .asInstanceOf[TypeInformation[T]] .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) val id = new AbstractID().toString sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) val sinkName = UUID.randomUUID().toString tEnv.registerTableSink(sinkName, sink) tEnv.insertInto(s"`$sinkName`", table) val res = tEnv.execute(jobName.getOrElse("test")) val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) SerializedListAccumulator.deserializeList(accResult, typeSerializer) }
Just shown as the code, parameter 'jobName' not used.
Attachments
Issue Links
- links to