Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14763

flink sql cep parallelism error

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.0
    • Fix Version/s: 1.10.0
    • Component/s: Table SQL / API
    • Labels:
      None
    • Environment:

      flink on yarn 

      flink 1.10

      hadoop 3.0

      kafka 2.2.0

      Description

      when i commit a cep sql with sql-client use parallelism large than 1 , it  print error as blow

      //代码占位符
      java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748)
      

      it seems allocate some key to the wrong taskmanager 

       

      the yaml is 

      //代码占位符
      execution:
        planner: blink
        type: streaming
        parallelism: 32
      ....
      - name: Ticker
          type: source-table
          update-mode: append
          connector:
            sink-partitioner: round-robin
            sink-partitioner-class: org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
            property-version: 1
            type: kafka
            version: universal
            topic: test_part
            startup-mode: earliest-offset
            properties:
              - key: bootstrap.servers
                value:  localhost:9092
              - key: group.id
                value: testGroup
          format:
            property-version: 1
            type: json
            derive-schema: true
          schema:
              - name: symbol
                type: VARCHAR
              - name: rtime
                type: TIMESTAMP
                rowtime:
                  timestamps:
                    type: from-field
                    from: rowtime
                  watermarks:
                    type: periodic-bounded
                    delay: 2000
              - name: price
                type: BIGINT
              - name: tax
                type: BIGINT
      
      

      and the query is from the demo:

      SELECT *
      FROM Ticker
          MATCH_RECOGNIZE(
              PARTITION BY symbol
              ORDER BY rtime
              MEASURES
                  C.price AS lastPrice
              ONE ROW PER MATCH
              AFTER MATCH SKIP PAST LAST ROW
              PATTERN (A B* C)
              DEFINE
                  A AS A.price > 10,
                  B AS B.price < 15,
                  C AS C.price > 12
          )
      

      the data :

                         symbol                     rtime                     price                       tax
                            ACME          2011-11-11T10:00                        12                         1
                            ACME       2011-11-11T10:00:02                        19                         1
                            ACME       2011-11-11T10:00:01                        17                         2
                            ACME       2011-11-11T10:00:03                        21                         3
                            ACME       2011-11-11T10:00:04                        25                         2
                            ACME       2011-11-11T10:00:05                        18                         1
                            ACME       2011-11-11T10:00:06                        15                         1
                            ACME       2011-11-11T10:00:07                        14                         2
                            ACME       2011-11-11T10:00:08                        24                         2
                            ACME       2011-11-11T10:00:09                        25                         2
                            ACME       2011-11-11T10:00:10                        19                         1
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              yantianyu richt richt
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: