Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6271

NumericBetweenParametersProvider NullPointer

    Details

      Description

      creating a NumericBetweenParametersProvider using fetchSize=1000, min=0 and max= 999 fails with a NP

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fpompermaier opened a pull request:

          https://github.com/apache/flink/pull/3686

          FLINK-6271[jdbc]Fix nullPointer when there's a single split

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [X] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [X] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [X] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fpompermaier/flink FLINK-6271

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3686.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3686



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fpompermaier opened a pull request: https://github.com/apache/flink/pull/3686 FLINK-6271 [jdbc] Fix nullPointer when there's a single split Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [X] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [X] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [X] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fpompermaier/flink FLINK-6271 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3686.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3686
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110127056

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          jdbcInputFormat.closeInputFormat();
          Assert.assertEquals(testData.length, recordCount);
          }
          + @Test
          + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
          + final Long min = new Long(JDBCTestBase.testData[0][0] + "");
          + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + "");
          + final long fetchSize = max + 1;//generate a single split
          + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
          + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
          + .setDrivername(DRIVER_CLASS)
          + .setDBUrl(DB_URL)
          + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
          + .setRowTypeInfo(rowTypeInfo)
          + .setParametersProvider(pramProvider)
          + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
          + .finish();
          +
          + jdbcInputFormat.openInputFormat();
          + InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
          + //assert that a single split was generated
          + Assert.assertEquals(1, splits.length);
          + int recordCount = 0;
          + Row row = new Row(5);
          + for (int i = 0; i < splits.length; i++) {
          + jdbcInputFormat.open(splits[i]);
          + while (!jdbcInputFormat.reachedEnd()) {
          + Row next = jdbcInputFormat.nextRecord(row);
          + if (next == null)

          { + break; + }

          + if(next.getField(0)!=null)

          { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}

          + if(next.getField(1)!=null)

          { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}

          + if(next.getField(2)!=null)

          { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}

          + if(next.getField(3)!=null)

          { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}

          + if(next.getField(4)!=null)

          { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}

          +
          + for (int x = 0; x < 5; x++) {
          + if(testData[recordCount][x]!=null) {
          — End diff –

          missing space after if

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110127056 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws jdbcInputFormat.closeInputFormat(); Assert.assertEquals(testData.length, recordCount); } + @Test + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final Long min = new Long(JDBCTestBase.testData [0] [0] + ""); + final Long max = new Long(JDBCTestBase.testData [JDBCTestBase.testData.length - 1] [0] + ""); + final long fetchSize = max + 1;//generate a single split + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //assert that a single split was generated + Assert.assertEquals(1, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits [i] ); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData [recordCount] [x] !=null) { — End diff – missing space after if
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110127016

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          jdbcInputFormat.closeInputFormat();
          Assert.assertEquals(testData.length, recordCount);
          }
          + @Test
          + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
          + final Long min = new Long(JDBCTestBase.testData[0][0] + "");
          + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + "");
          + final long fetchSize = max + 1;//generate a single split
          + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
          + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
          + .setDrivername(DRIVER_CLASS)
          + .setDBUrl(DB_URL)
          + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
          + .setRowTypeInfo(rowTypeInfo)
          + .setParametersProvider(pramProvider)
          + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
          + .finish();
          +
          + jdbcInputFormat.openInputFormat();
          + InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
          + //assert that a single split was generated
          + Assert.assertEquals(1, splits.length);
          + int recordCount = 0;
          + Row row = new Row(5);
          + for (int i = 0; i < splits.length; i++) {
          + jdbcInputFormat.open(splits[i]);
          + while (!jdbcInputFormat.reachedEnd()) {
          + Row next = jdbcInputFormat.nextRecord(row);
          + if (next == null)

          { + break; + }

          + if(next.getField(0)!=null)

          { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}

          — End diff –

          spaces missing after if, around `!=`, and pelase move the assert into a separate line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110127016 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws jdbcInputFormat.closeInputFormat(); Assert.assertEquals(testData.length, recordCount); } + @Test + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final Long min = new Long(JDBCTestBase.testData [0] [0] + ""); + final Long max = new Long(JDBCTestBase.testData [JDBCTestBase.testData.length - 1] [0] + ""); + final long fetchSize = max + 1;//generate a single split + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //assert that a single split was generated + Assert.assertEquals(1, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits [i] ); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} — End diff – spaces missing after if, around `!=`, and pelase move the assert into a separate line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110159791

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          jdbcInputFormat.closeInputFormat();
          Assert.assertEquals(testData.length, recordCount);
          }
          + @Test
          + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
          + final Long min = new Long(JDBCTestBase.testData[0][0] + "");
          + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + "");
          + final long fetchSize = max + 1;//generate a single split
          + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
          + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
          + .setDrivername(DRIVER_CLASS)
          + .setDBUrl(DB_URL)
          + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
          + .setRowTypeInfo(rowTypeInfo)
          + .setParametersProvider(pramProvider)
          + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
          + .finish();
          +
          + jdbcInputFormat.openInputFormat();
          + InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
          + //assert that a single split was generated
          + Assert.assertEquals(1, splits.length);
          + int recordCount = 0;
          + Row row = new Row(5);
          + for (int i = 0; i < splits.length; i++) {
          + jdbcInputFormat.open(splits[i]);
          + while (!jdbcInputFormat.reachedEnd()) {
          + Row next = jdbcInputFormat.nextRecord(row);
          + if (next == null)

          { + break; + }

          + if(next.getField(0)!=null)

          { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}

          + if(next.getField(1)!=null)

          { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}

          + if(next.getField(2)!=null)

          { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}

          + if(next.getField(3)!=null)

          { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}

          + if(next.getField(4)!=null)

          { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}

          +
          + for (int x = 0; x < 5; x++) {
          + if(testData[recordCount][x]!=null) {
          — End diff –

          How should I ensure the param validity? Is there any helper class or should just throw an IllegalArgumentException? Is there any example I can take as reference param handling?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110159791 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws jdbcInputFormat.closeInputFormat(); Assert.assertEquals(testData.length, recordCount); } + @Test + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final Long min = new Long(JDBCTestBase.testData [0] [0] + ""); + final Long max = new Long(JDBCTestBase.testData [JDBCTestBase.testData.length - 1] [0] + ""); + final long fetchSize = max + 1;//generate a single split + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //assert that a single split was generated + Assert.assertEquals(1, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits [i] ); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData [recordCount] [x] !=null) { — End diff – How should I ensure the param validity? Is there any helper class or should just throw an IllegalArgumentException? Is there any example I can take as reference param handling?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110160658

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          jdbcInputFormat.closeInputFormat();
          Assert.assertEquals(testData.length, recordCount);
          }
          + @Test
          + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
          + final Long min = new Long(JDBCTestBase.testData[0][0] + "");
          + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + "");
          + final long fetchSize = max + 1;//generate a single split
          + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
          + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
          + .setDrivername(DRIVER_CLASS)
          + .setDBUrl(DB_URL)
          + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
          + .setRowTypeInfo(rowTypeInfo)
          + .setParametersProvider(pramProvider)
          + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
          + .finish();
          +
          + jdbcInputFormat.openInputFormat();
          + InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
          + //assert that a single split was generated
          + Assert.assertEquals(1, splits.length);
          + int recordCount = 0;
          + Row row = new Row(5);
          + for (int i = 0; i < splits.length; i++) {
          + jdbcInputFormat.open(splits[i]);
          + while (!jdbcInputFormat.reachedEnd()) {
          + Row next = jdbcInputFormat.nextRecord(row);
          + if (next == null)

          { + break; + }

          + if(next.getField(0)!=null)

          { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}

          + if(next.getField(1)!=null)

          { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}

          + if(next.getField(2)!=null)

          { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}

          + if(next.getField(3)!=null)

          { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}

          + if(next.getField(4)!=null)

          { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}

          +
          + for (int x = 0; x < 5; x++) {
          + if(testData[recordCount][x]!=null) {
          — End diff –

          use the Flink `Preconditions` class, specifically the `checkArgument()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110160658 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws jdbcInputFormat.closeInputFormat(); Assert.assertEquals(testData.length, recordCount); } + @Test + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final Long min = new Long(JDBCTestBase.testData [0] [0] + ""); + final Long max = new Long(JDBCTestBase.testData [JDBCTestBase.testData.length - 1] [0] + ""); + final long fetchSize = max + 1;//generate a single split + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //assert that a single split was generated + Assert.assertEquals(1, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits [i] ); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData [recordCount] [x] !=null) { — End diff – use the Flink `Preconditions` class, specifically the `checkArgument()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110164047

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          jdbcInputFormat.closeInputFormat();
          Assert.assertEquals(testData.length, recordCount);
          }
          + @Test
          + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
          + final Long min = new Long(JDBCTestBase.testData[0][0] + "");
          + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + "");
          + final long fetchSize = max + 1;//generate a single split
          + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
          + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
          + .setDrivername(DRIVER_CLASS)
          + .setDBUrl(DB_URL)
          + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
          + .setRowTypeInfo(rowTypeInfo)
          + .setParametersProvider(pramProvider)
          + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
          + .finish();
          +
          + jdbcInputFormat.openInputFormat();
          + InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
          + //assert that a single split was generated
          + Assert.assertEquals(1, splits.length);
          + int recordCount = 0;
          + Row row = new Row(5);
          + for (int i = 0; i < splits.length; i++) {
          + jdbcInputFormat.open(splits[i]);
          + while (!jdbcInputFormat.reachedEnd()) {
          + Row next = jdbcInputFormat.nextRecord(row);
          + if (next == null)

          { + break; + }

          + if(next.getField(0)!=null)

          { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}

          + if(next.getField(1)!=null)

          { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}

          + if(next.getField(2)!=null)

          { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}

          + if(next.getField(3)!=null)

          { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}

          + if(next.getField(4)!=null)

          { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}

          +
          + for (int x = 0; x < 5; x++) {
          + if(testData[recordCount][x]!=null) {
          — End diff –

          I should have addressed all your comments. The only thing I didn't implement is the control that min>=0 and max>0 because this is not true in general (e.g. SELECT * FROM ACCOUNTS WHERE amount BETWEEN -100 AND 0)

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110164047 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -180,6 +180,52 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws jdbcInputFormat.closeInputFormat(); Assert.assertEquals(testData.length, recordCount); } + @Test + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final Long min = new Long(JDBCTestBase.testData [0] [0] + ""); + final Long max = new Long(JDBCTestBase.testData [JDBCTestBase.testData.length - 1] [0] + ""); + final long fetchSize = max + 1;//generate a single split + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //assert that a single split was generated + Assert.assertEquals(1, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits [i] ); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData [recordCount] [x] !=null) { — End diff – I should have addressed all your comments. The only thing I didn't implement is the control that min>=0 and max>0 because this is not true in general (e.g. SELECT * FROM ACCOUNTS WHERE amount BETWEEN -100 AND 0)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on the issue:

          https://github.com/apache/flink/pull/3686

          Is it ok...? Could this be merged?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3686 Is it ok...? Could this be merged?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110337050

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -162,11 +162,11 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          if (next == null)

          { break; }
          • if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
            - if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
            - if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
            - if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
            - if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
            + if (next.getField(0) != null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
              • End diff –

          you missed putting the assertions on a new line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110337050 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -162,11 +162,11 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws if (next == null) { break; } if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} - if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} - if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} - if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} - if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + if (next.getField(0) != null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} End diff – you missed putting the assertions on a new line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3686

          The approach is good though, once we sort out the formatting we're good to go imo.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3686 The approach is good though, once we sort out the formatting we're good to go imo.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110337693

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -162,11 +162,11 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          if (next == null)

          { break; }
          • if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
            - if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
            - if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
            - if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
            - if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
            + if (next.getField(0) != null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
              • End diff –

          I didn't know that Flink introduces checkstyle at compile time...is there any official Eclipse formatter somewhere I can use to verify the code at development time?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110337693 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -162,11 +162,11 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws if (next == null) { break; } if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} - if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} - if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} - if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} - if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + if (next.getField(0) != null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} End diff – I didn't know that Flink introduces checkstyle at compile time...is there any official Eclipse formatter somewhere I can use to verify the code at development time?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3686#discussion_r110338259

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java —
          @@ -162,11 +162,11 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws
          if (next == null)

          { break; }
          • if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
            - if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());}
            - if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());}
            - if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());}
            - if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());}
            + if (next.getField(0) != null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());}
              • End diff –

          Sorry, I don't know anything about eclipse. The easiest way to check the checkstyle is to run `mvn verify -DskipTests` for the module that you're changing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3686#discussion_r110338259 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java — @@ -162,11 +162,11 @@ public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws if (next == null) { break; } if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} - if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} - if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} - if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} - if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} + if (next.getField(0) != null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} End diff – Sorry, I don't know anything about eclipse. The easiest way to check the checkstyle is to run `mvn verify -DskipTests` for the module that you're changing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3686

          merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3686 merging.
          Hide
          Zentol Chesnay Schepler added a comment -

          705938e5965a98b17bd6ba3f1e06728a35e4f8a9

          Show
          Zentol Chesnay Schepler added a comment - 705938e5965a98b17bd6ba3f1e06728a35e4f8a9
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3686

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3686
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user patricklucas commented on the issue:

          https://github.com/apache/flink/pull/3686

          @fpompermaier FYI your author email address on this commit was "f.pompermaier@gmai.com" (missing 'l'). You might have a typo in your gitconfig?

          Show
          githubbot ASF GitHub Bot added a comment - Github user patricklucas commented on the issue: https://github.com/apache/flink/pull/3686 @fpompermaier FYI your author email address on this commit was "f.pompermaier@gmai.com" (missing 'l'). You might have a typo in your gitconfig?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on the issue:

          https://github.com/apache/flink/pull/3686

          You were right @patricklucas, now I've correct the email address in my .gitconfig. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3686 You were right @patricklucas, now I've correct the email address in my .gitconfig. Thanks!

            People

            • Assignee:
              f.pompermaier Flavio Pompermaier
              Reporter:
              f.pompermaier Flavio Pompermaier
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development