Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Duplicate
-
1.10.1
-
None
-
None
Description
Flink sideoutput union seems not works right. If we union the sideoutput from the same operator, the output is the result of last side output times by the number of unions, which is not expected. For example,
val side = new OutputTag[String]("side") val side2 = new OutputTag[String]("side2") val side3 = new OutputTag[String]("side3") val ds = env.socketTextStream("master",9001) val res = ds.process(new ProcessFunction[String,String] { override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = { if(value.contains("hello")) { ctx.output(side,value) } else if(value.contains("world")) { ctx.output(side2,value) } else if(value.contains("flink")) { ctx.output(side3,value) } out.collect(value) } }) val res1 = res.getSideOutput(side) val res2 = res.getSideOutput(side2) val res3 = res.getSideOutput(side3) println( "====>"+res1.getClass) println( "====>"+res2.getClass) res1.print("res1") res2.print("res2") res3.print("res3") res2.union(res1).union(res3).print("all")
If we input
hello world flink
The output will be
res1> hello res2> world res3> flink all> flink all> flink all> flink
But the expected output would be
res1> hello res2> world res3> flink all> hello all> world all> flink
if we add a map after the sideoutput and then union them, the output would be right, but adding map should be not needed.
Attachments
Issue Links
- is caused by
-
FLINK-17578 Union of 2 SideOutputs behaviour incorrect
- Closed