Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10763

Interval join produces wrong result type in Scala API

    XMLWordPrintableJSON

Details

    Description

      When stream is a Scala case class, the TypeInformation will fall back to GenericType in the process function which result in bad performance when union another DataStream.

      In the union method of DataStream, the type is first checked for equality.

      Here is an example:

      object Test {
      
          def main(args: Array[String]): Unit = {
            val env = StreamExecutionEnvironment.getExecutionEnvironment
      
            val orderA: DataStream[Order] = env.fromCollection(Seq(
              Order(1L, "beer", 3),
               Order(1L, "diaper", 4),
               Order(3L, "rubber", 2)))
      
            val orderB: DataStream[Order] = env.fromCollection(Seq(
              new Order(2L, "pen", 3),
              new Order(2L, "rubber", 3),
              new Order(4L, "beer", 1)))
      
            val orderC: DataStream[Order] = orderA.keyBy(_.user)
              .intervalJoin(orderB.keyBy(_.user))
              .between(Time.seconds(0), Time.seconds(0))
              .process(new ProcessJoinFunction[Order, Order, Order] {
                override def processElement(left: Order, right: Order, ctx: ProcessJoinFunction[Order, Order, Order]#Context, out: Collector[Order]): Unit = {
                  out.collect(left)
                }})
      
            println("C: " + orderC.dataType.toString)
            println("B: " + orderB.dataType.toString)
      
            orderC.union(orderB).print()
      
            env.execute()
          }
      
          case class Order(user: Long, product: String, amount: Int)
      }

      Here is the Exception:

      Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<com.manbuyun.awesome.flink.Test.Order> and com.manbuyun.awesome.flink.Test$Order(user: Long, product: String, amount: Integer)
       at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:219)
       at org.apache.flink.streaming.api.scala.DataStream.union(DataStream.scala:357)
       at com.manbuyun.awesome.flink.Test$.main(Test.scala:38)
       at com.manbuyun.awesome.flink.Test.main(Test.scala)

       

      Attachments

        Activity

          People

            twalthr Timo Walther
            csbliss jinhai
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: