Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16919

Parameter 'jobName' does not take effect in BatchTableEnvUtil

    XMLWordPrintableJSON

Details

    Description

        def collect[T](
            tEnv: TableEnvironment,
            table: Table,
            sink: CollectTableSink[T],
            jobName: Option[String]): Seq[T] = {   // jobName was not used
          val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
            .asInstanceOf[TypeInformation[T]]
            .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
              .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
          val id = new AbstractID().toString
          sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
          val sinkName = UUID.randomUUID().toString
          tEnv.registerTableSink(sinkName, sink)
          tEnv.insertInto(s"`$sinkName`", table)
      
          val res = tEnv.execute(jobName.getOrElse("test"))
          val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
          SerializedListAccumulator.deserializeList(accResult, typeSerializer)
        }
      

      Just shown as the code, parameter 'jobName' not used.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Dillon. Zhanchun Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m