Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.1.0, 1.2.0
-
None
Description
RuntimeContext is never set in RichOutputFormat. I tested it in local execution. RichMapFunction is setup correctly.
Following code will never print "//////Context set in RichOutputFormat"
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} import org.apache.flink.api.common.io.RichOutputFormat import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object Startup { def main(args: Array[String]): Unit = { val mapFunction = new RichMapFunction[String, String] { def open(taskNumber: Int, numTasks: Int) { getRuntimeContext } def map(event: String) = { event } override def setRuntimeContext(t: RuntimeContext) = { println("//////Context set in RichMapFunction") super.setRuntimeContext(t) } } val outputFormat = new RichOutputFormat[String] { override def setRuntimeContext(t: RuntimeContext) = { println("//////Context set in RichOutputFormat") super.setRuntimeContext(t) } def open(taskNumber: Int, numTasks: Int) {} def writeRecord(event: String) { println(event) } def configure(parameters: Configuration): Unit = {} def close(): Unit = {} } val see = StreamExecutionEnvironment.getExecutionEnvironment val eventsStream = see.fromElements[String]("A", "B", "C").map(mapFunction) eventsStream.writeUsingOutputFormat(outputFormat) see.execute("test-job") } }
Attachments
Issue Links
- links to