Details

      Description

      After upgrading from Flink 1.2.0 to 1.2.1 I got the following error
      ```
      07:54:52,395 ERROR org.apache.flink.api.common.io.DelimitedInputFormat - Unexpected problen while getting the file statistics for file 'mytestfile': -1
      java.lang.ArrayIndexOutOfBoundsException: -1
      at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:572)
      at org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:423)
      at org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:48)
      at org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
      ```
      I have created a test repo to isolate the issue here
      https://github.com/physikerwelt/flinkReadTest
      and reproduced the bug using travis
      https://travis-ci.org/physikerwelt/flinkReadTest

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Fabian Hueske Could you please look at this? I think it could be caused by the changes around FLINK-5771.

          Show
          aljoscha Aljoscha Krettek added a comment - Fabian Hueske Could you please look at this? I think it could be caused by the changes around FLINK-5771 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fhueske opened a pull request:

          https://github.com/apache/flink/pull/4088

          FLINK-6652 [core] Fix handling of delimiters split by buffers in DelimitedInputFormat

          This PR fixes a bug introduced by fix FLINK-5771 / PR #3316. That fix resets the read position if a character sequence matches a prefix of a multi char delimiter. This will yield an `IndexOutOfBoundsException` if prefix spans the boundaries of the read buffer.

          This fix moves the prefix of the multi char delimiter to the beginning of the read buffer if the read buffer was completely read. In case the read position has to be reset, the prefix is still in the read buffer and can be parsed again.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fhueske/flink fixDelimIF

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4088.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4088


          commit 740a2f42788e1cb957ab1e79b9b67dcfb06b875d
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2017-06-07T21:01:06Z

          FLINK-6652 [core] Fix handling of delimiters split by buffers in DelimitedInputFormat


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4088 FLINK-6652 [core] Fix handling of delimiters split by buffers in DelimitedInputFormat This PR fixes a bug introduced by fix FLINK-5771 / PR #3316. That fix resets the read position if a character sequence matches a prefix of a multi char delimiter. This will yield an `IndexOutOfBoundsException` if prefix spans the boundaries of the read buffer. This fix moves the prefix of the multi char delimiter to the beginning of the read buffer if the read buffer was completely read. In case the read position has to be reset, the prefix is still in the read buffer and can be parsed again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink fixDelimIF Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4088.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4088 commit 740a2f42788e1cb957ab1e79b9b67dcfb06b875d Author: Fabian Hueske <fhueske@apache.org> Date: 2017-06-07T21:01:06Z FLINK-6652 [core] Fix handling of delimiters split by buffers in DelimitedInputFormat
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on the issue:

          https://github.com/apache/flink/pull/4088

          FYI, the reference solutions to the batch training exercises are failing, and it looks to me like it's because of this issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4088 FYI, the reference solutions to the batch training exercises are failing, and it looks to me like it's because of this issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4088#discussion_r122929197

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java —
          @@ -246,6 +246,9 @@ public void setDelimiter(String delimiter)

          { throw new IllegalArgumentException("Delimiter must not be null"); }

          this.delimiter = delimiter.getBytes(getCharset());
          + if (this.bufferSize > 0 && this.delimiter.length >= this.bufferSize) {
          — End diff –

          since the initial value for `bufferSize` is `-1` doesn't this mean that the delimiter can never be set without calling `setBufferSize`? Would it make sense to move this check into `open()`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4088#discussion_r122929197 — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java — @@ -246,6 +246,9 @@ public void setDelimiter(String delimiter) { throw new IllegalArgumentException("Delimiter must not be null"); } this.delimiter = delimiter.getBytes(getCharset()); + if (this.bufferSize > 0 && this.delimiter.length >= this.bufferSize) { — End diff – since the initial value for `bufferSize` is `-1` doesn't this mean that the delimiter can never be set without calling `setBufferSize`? Would it make sense to move this check into `open()`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4088#discussion_r122929860

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java —
          @@ -269,6 +272,9 @@ public void setBufferSize(int bufferSize) {
          if (bufferSize < 1)

          { throw new IllegalArgumentException("Buffer size must be at least 1."); }

          + if (bufferSize <= delimiter.length) {
          — End diff –

          we should also adjust the condition above that `bufferSize` must be be greater than one.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4088#discussion_r122929860 — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java — @@ -269,6 +272,9 @@ public void setBufferSize(int bufferSize) { if (bufferSize < 1) { throw new IllegalArgumentException("Buffer size must be at least 1."); } + if (bufferSize <= delimiter.length) { — End diff – we should also adjust the condition above that `bufferSize` must be be greater than one.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/4088

          Thanks for the review @zentol!
          I moved the check to `initBuffers()` which is called from `open()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4088 Thanks for the review @zentol! I moved the check to `initBuffers()` which is called from `open()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4088

          +1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4088 +1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4088

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4088
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.2.2 with c709dce63d27ad06658aeba356f851b832f4d093
          Fixed for 1.3.1 with 088232c2e715f0ba49785c0e1168a93430668caa
          Fixed for 1.4.0 with be662bf7ebcefb289988a24392104c3385029568

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.2.2 with c709dce63d27ad06658aeba356f851b832f4d093 Fixed for 1.3.1 with 088232c2e715f0ba49785c0e1168a93430668caa Fixed for 1.4.0 with be662bf7ebcefb289988a24392104c3385029568

            People

            • Assignee:
              fhueske Fabian Hueske
              Reporter:
              physikerwelt Moritz Schubotz
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development