Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9164

times(#,#) quantifier does not seem to work

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.4.2
    • None
    • Library / CEP
    • None
    • Windows 10 Pro 64-bit

      Core i7-6820HQ @ 2.7 GHz

      16GB RAM

      Flink 1.4.2

      Scala client

      Scala 2.11.12

    Description

      Assuming the following piece of code :

      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      env.setParallelism(1)
      
      val inputs = env.fromElements((1L, 'a'), (20L, 'a'), (22L, 'a'), (30L, 'a'), (40L, 'a'))
          .assignAscendingTimestamps(_._1)
      
      val pattern = Pattern.begin[(Long, Char)]("Start", AfterMatchSkipStrategy.skipPastLastEvent())
          .where(_._2 == 'a').times(1,2)
      
      CEP.pattern(inputs, pattern).select(_("Start")).addSink(println(_))
      
      env.execute("Test"

       

      This results in

      Buffer((1,a))
      Buffer((20,a))
      Buffer((22,a))
      Buffer((30,a))
      Buffer((40,a))

      While I would expect

      Buffer((1,a), (20,a))
      Buffer((22,a), (30,a))
      Buffer((40,a)

      My purpose is to match events by pair if possible, or alone if not. Note that adding greedy does nothing mode but this may be due to https://issues.apache.org/jira/browse/FLINK-8914.

      Attachments

        Activity

          People

            Unassigned Unassigned
            rrevol Romain Revol
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: