Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18960

flink sideoutput union

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Duplicate
    • 1.10.1
    • None
    • API / DataStream
    • None

    Description

      Flink sideoutput union seems not works right. If we union the sideoutput from the same operator, the output is the result of last side output times by the number of unions, which is not expected. For example,

      val side = new OutputTag[String]("side")
       val side2 = new OutputTag[String]("side2")
       val side3 = new OutputTag[String]("side3")
       val ds = env.socketTextStream("master",9001)
       val res = ds.process(new ProcessFunction[String,String] {
       override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
       if(value.contains("hello"))
      
      { ctx.output(side,value) }
      
      else if(value.contains("world"))
      
      { ctx.output(side2,value) }
      
      else if(value.contains("flink"))
      
      { ctx.output(side3,value) }
      
      out.collect(value)
       }
       })
      
      val res1 = res.getSideOutput(side)
       val res2 = res.getSideOutput(side2)
       val res3 = res.getSideOutput(side3)
      
      println( "====>"+res1.getClass)
       println( "====>"+res2.getClass)
      
      res1.print("res1")
       res2.print("res2")
       res3.print("res3")
      
      res2.union(res1).union(res3).print("all")
      

       

       If we input 

      hello
      world
      flink
      

      The output will be 

       

      res1> hello
       res2> world
       res3> flink
       all> flink
       all> flink
       all> flink
      

       

      But the expected output would be 

      res1> hello
      res2> world
      res3> flink
      all> hello 
      all> world 
      all> flink
      

       

       

      if we add a map after the sideoutput and then union them, the output would be right, but adding map should be not needed. 

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xiaohang.li xiaohang.li
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: