Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8828

Add collect method to DataStream / DataSet scala api

    XMLWordPrintableJSON

    Details

      Description

      A collect function is a method that takes a Partial Function as its parameter and applies it to all the elements in the collection to create a new collection which satisfies the Partial Function.

      It can be found on all core scala collection classes as well as on spark's rdd interface

      To understand its utility imagine the following scenario :

      Given a DataStream that produces events of type Purchase and View
      Transform this stream into a stream of purchase amounts over 1000 euros.

      Currently an implementation might look like

      val x = dataStream
        .filter(_.isInstanceOf[Purchase])
        .map(_.asInstanceOf[Purchase])
        .filter(_.amount > 1000)
        .map(_.amount)

      Or alternatively you could do this

      dataStream.flatMap(_ match {
        case p: Purchase if p.amount > 1000 => Some(p.amount)
        case _ => None
      })

      But with collect implemented it could look like

      dataStream.collect {
        case p: Purchase if p.amount > 1000 => p.amount
      }

       

      Which is a lot nicer to both read and write

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                jelmer1 Jelmer Kuperus
              • Votes:
                0 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated: