Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4428

Method map/flatMapWithState may need a eviction policy

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Resolved
    • 1.1.2
    • None
    • API / DataStream
    • None

    Description

      I want to count the number of unique visitors of a website every day.
      If the number changes, I want get the newest number in 1 second, and
      it should keep silence if the number doesn't change.I implemented this
      by time window of 1 day,trigger of 1 second and flatMapWithState to
      filter duplicated results.

           //    case class Visit(uuid: String, time: Long, platform: Int)
       
           //    case class WindowUv(platform: Int, uv: Long, windowStart: Long, WindowEnd: Long)
       
           //  val consumer: FlinkKafkaConsumer08[Visit]
           val stream =
           env.addSource(consumer)
             .keyBy(_.platform)
             .timeWindow(Time.days(1))
             .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
             .applyWith((0, Set.empty[Int], 0l, 0l))(
               foldFunction = {
                 case ((_, set, _, 0), visit) =>
                   (visit.platform, set + visit.uuid.hashCode, 0, 0)
               },
               windowFunction = {
                 case (key, window, results) =>
                   results.map {
                     case (platform, set, _, _) =>
                       (platform, set, window.getStart, window.getEnd)
                   }
               }
             )
             .mapWith {
               case (key, set, windowStart, windowEnd) =>
                 WindowUv(key, set.size, windowStart, windowEnd)
             }
             .keyBy(uv => (uv.platform, uv.windowStart))
             .flatMapWithState[WindowUv, Int] {
             case ((key, num, begin, end), curr) =>
               curr match {
                 case Some(numCurr) if numCurr == num =>
                   (Seq.empty, Some(num))
                 case _ =>
                   (Seq(WindowUv(key, num, begin, end)), Some(num))
               }
           }
           stream.print()
           env.execute("Boom")
      

      There is a problem that I used flatMapWithState,the state of one day will
      be never updated and never used after the day passed, but it will stay
      in the memory forever, there is no way to evict it. So I think the status
      in map may need some eviction policy related with time or global conditions
      rather than only with the last message of the key(It's hard to tell whether
      a message is the last when the last is coming).

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              RenkaiGe Renkai Ge
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: