Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4636

AbstractCEPPatternOperator fails to restore state

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.2.0, 1.1.2
    • Fix Version/s: None
    • Component/s: CEP
    • Labels:
      None

      Description

      The restoreState() of the AbstractCEPPatternOperator restores the a Java PriorityQueue. For that it first reads the number of elements to insert and then creates a PriorityQueue object. However, Java's PriorityQueue cannot be instantiated with an initial capacity of 0, which is not checked.
      In case of an empty queue, the PriorityQueue should be instantiated with an initial size of 1.

      See http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024.html

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user jaxbihani opened a pull request:

          https://github.com/apache/flink/pull/2568

          FLINK-4636 Add boundary check for priorityqueue for cep operator

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          When numberPriorityQueueEntries=0, creation of priority queue object
          fails as its constructor throws exception when size is passed as 0.
          We check for this condition and skip creating object as it doesn't serve
          any purpose in that case.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/jaxbihani/flink fix-abstractcep-op-restore-state

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2568.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2568


          commit 0b7c58b911e7942a4894c631d04de0d7944c6508
          Author: Jagadish Bihani <jagadish@helpshift.com>
          Date: 2016-09-29T14:29:38Z

          FLINK-4636 Add boundary check for priorityqueue for cep operator

          When numberPriorityQueueEntries=0, creation of priority queue object
          fails as its constructor throws exception when size is passed as 0.
          We check for this condition and skip creating object as it doesn't serve
          any purpose in that case.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user jaxbihani opened a pull request: https://github.com/apache/flink/pull/2568 FLINK-4636 Add boundary check for priorityqueue for cep operator Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed When numberPriorityQueueEntries=0, creation of priority queue object fails as its constructor throws exception when size is passed as 0. We check for this condition and skip creating object as it doesn't serve any purpose in that case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaxbihani/flink fix-abstractcep-op-restore-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2568 commit 0b7c58b911e7942a4894c631d04de0d7944c6508 Author: Jagadish Bihani <jagadish@helpshift.com> Date: 2016-09-29T14:29:38Z FLINK-4636 Add boundary check for priorityqueue for cep operator When numberPriorityQueueEntries=0, creation of priority queue object fails as its constructor throws exception when size is passed as 0. We check for this condition and skip creating object as it doesn't serve any purpose in that case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2568#discussion_r81155827

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java —
          @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception {

          int numberPriorityQueueEntries = ois.readInt();

          • priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
            -
          • for (int i = 0; i <numberPriorityQueueEntries; i++) {
          • StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
          • priorityQueue.offer(streamElement.<IN>asRecord());
            + if(numberPriorityQueueEntries > 0) {
            + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
              • End diff –

          Initialize with size `Math.max(INITIAL_PRIORITY_QUEUE_CAPACITY, numberPriorityQueueEntries)`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81155827 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java — @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); - for (int i = 0; i <numberPriorityQueueEntries; i++) { StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois)); priorityQueue.offer(streamElement.<IN>asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); End diff – Initialize with size `Math.max(INITIAL_PRIORITY_QUEUE_CAPACITY, numberPriorityQueueEntries)`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jaxbihani commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2568#discussion_r81199485

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java —
          @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception {

          int numberPriorityQueueEntries = ois.readInt();

          • priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
            -
          • for (int i = 0; i <numberPriorityQueueEntries; i++) {
          • StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
          • priorityQueue.offer(streamElement.<IN>asRecord());
            + if(numberPriorityQueueEntries > 0) {
            + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
              • End diff –

          I think if number of objects read are 0 then why would we want to create object at all? Thats unnecessary and condition check will save some CPU cycles.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jaxbihani commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81199485 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java — @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); - for (int i = 0; i <numberPriorityQueueEntries; i++) { StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois)); priorityQueue.offer(streamElement.<IN>asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); End diff – I think if number of objects read are 0 then why would we want to create object at all? Thats unnecessary and condition check will save some CPU cycles.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2568#discussion_r81200110

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java —
          @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception {

          int numberPriorityQueueEntries = ois.readInt();

          • priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
            -
          • for (int i = 0; i <numberPriorityQueueEntries; i++) {
          • StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
          • priorityQueue.offer(streamElement.<IN>asRecord());
            + if(numberPriorityQueueEntries > 0) {
            + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
              • End diff –

          The queue is created in `open()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2568#discussion_r81200110 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java — @@ -126,11 +126,12 @@ public void restoreState(FSDataInputStream state) throws Exception { int numberPriorityQueueEntries = ois.readInt(); priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); - for (int i = 0; i <numberPriorityQueueEntries; i++) { StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois)); priorityQueue.offer(streamElement.<IN>asRecord()); + if(numberPriorityQueueEntries > 0) { + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); End diff – The queue is created in `open()`.
          Hide
          dawidwys Dawid Wysakowicz added a comment - - edited

          I think it is was resolved in FLINK-6032. Kostas Kloudas Do you agree?

          Show
          dawidwys Dawid Wysakowicz added a comment - - edited I think it is was resolved in FLINK-6032 . Kostas Kloudas Do you agree?

            People

            • Assignee:
              jaxbihani Jagadish Bihani
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development