Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4311

TableInputFormat fails when reused on next split

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.3
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: None
    • Labels:
      None

      Description

      We have written a batch job that uses data from HBase by means of using the TableInputFormat.

      We have found that this class sometimes fails with this exception:

      java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
      at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
      at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
      at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
      at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
      at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
      at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
      at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
      at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
      at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
      at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
      at org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
      at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
      at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
      at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
      at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
      ... 10 more

      As you can see the ThreadPoolExecutor was terminated at this point.

      We tracked it down to the fact that

      1. the configure method opens the table
      2. the open method obtains the result scanner
      3. the closes method closes the table.

      If a second split arrives on the same instance then the open method will fail because the table has already been closed.

      We also found that this error varies with the versions of HBase that are used. I have also seen this exception:

      Caused by: java.io.IOException: hconnection-0x19d37183 closed
      at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
      at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
      ... 37 more

      I found that in the documentation of the InputFormat interface is clearly states

      IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That is due to the fact that the input format is used for potentially multiple splits. After a split is done, the format's close function is invoked and, if another split is available, the open function is invoked afterwards for the next split.

      It appears that this specific InputFormat has not been checked against this constraint.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user nielsbasjes opened a pull request:

          https://github.com/apache/flink/pull/2330

          FLINK-4311 Fixed several problems in TableInputFormat

          Question: Do you guys want a unit test for this?
          In HBase itself I have done this in the past yet this required a large chunk of additional software to start and stop an HBase minicluster during the unit tests.
          I.e. pull in this thing:
          https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
          and then do something like this:
          https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestScanRowPrefix.java

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/nielsbasjes/flink FLINK-4311

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2330.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2330


          commit 5c3d53c810f8df6d5544685ef3f1004c46541daf
          Author: Niels Basjes <nbasjes@bol.com>
          Date: 2016-08-03T12:54:34Z

          FLINK-4311 TableInputFormat can handle reuse for next input split

          commit 8696f5e257c7434d62e662c4c97f4ede2da5411b
          Author: Niels Basjes <nbasjes@bol.com>
          Date: 2016-08-03T12:56:01Z

          FLINK-4311 Cannot override a static member function.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user nielsbasjes opened a pull request: https://github.com/apache/flink/pull/2330 FLINK-4311 Fixed several problems in TableInputFormat Question: Do you guys want a unit test for this? In HBase itself I have done this in the past yet this required a large chunk of additional software to start and stop an HBase minicluster during the unit tests. I.e. pull in this thing: https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java and then do something like this: https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestScanRowPrefix.java You can merge this pull request into a Git repository by running: $ git pull https://github.com/nielsbasjes/flink FLINK-4311 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2330 commit 5c3d53c810f8df6d5544685ef3f1004c46541daf Author: Niels Basjes <nbasjes@bol.com> Date: 2016-08-03T12:54:34Z FLINK-4311 TableInputFormat can handle reuse for next input split commit 8696f5e257c7434d62e662c4c97f4ede2da5411b Author: Niels Basjes <nbasjes@bol.com> Date: 2016-08-03T12:56:01Z FLINK-4311 Cannot override a static member function.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          Oh damn,
          I just noticed a major issue in this: In order to create the input splits the table needs to be available "before" the call to the 'open' method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 Oh damn, I just noticed a major issue in this: In order to create the input splits the table needs to be available "before" the call to the 'open' method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2330

          maybe you can move the table initialization into `openInputFormat()` (called once before all splits) and close it in `closeInputFormat()` (called once after all splits).

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2330 maybe you can move the table initialization into `openInputFormat()` (called once before all splits) and close it in `closeInputFormat()` (called once after all splits).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          Yes, that is indeed the right place to do this.
          Bummer this method does not allow throwing exceptions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 Yes, that is indeed the right place to do this. Bummer this method does not allow throwing exceptions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          Now I see why I missed these two; They are newer than the 1.0.3 I was working with.
          Is it a good idea to add ' throws IOException' to these two in RichInputFormat ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 Now I see why I missed these two; They are newer than the 1.0.3 I was working with. Is it a good idea to add ' throws IOException' to these two in RichInputFormat ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2330

          I would say yes, since `open()` and `close()` can also throw an `IOException`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2330 I would say yes, since `open()` and `close()` can also throw an `IOException`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          Note that this version still assumes that the single instance will only see multiple splits for the same table. Is that a safe assumption?

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 Note that this version still assumes that the single instance will only see multiple splits for the same table. Is that a safe assumption?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r73381826

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java —
          @@ -328,7 +328,11 @@ public void run() {
          synchronized (checkpointLock) {
          LOG.info("Reader terminated, and exiting...");

          • this.format.closeInputFormat();
            + try { + this.format.closeInputFormat(); + }

            catch (IOException e) {
            + // Ignoring

              • End diff –

          it would be good to log the exception

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r73381826 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java — @@ -328,7 +328,11 @@ public void run() { synchronized (checkpointLock) { LOG.info("Reader terminated, and exiting..."); this.format.closeInputFormat(); + try { + this.format.closeInputFormat(); + } catch (IOException e) { + // Ignoring End diff – it would be good to log the exception
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r73382033

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -237,7 +244,7 @@ private void logSplitInfo(String action, TableInputSplit split) {

          • End key of the region
          • @return true, if this region needs to be included as part of the input (default).
            */
          • private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
            + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
              • End diff –

          why are you changing this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r73382033 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -237,7 +244,7 @@ private void logSplitInfo(String action, TableInputSplit split) { End key of the region @return true, if this region needs to be included as part of the input (default). */ private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { End diff – why are you changing this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2330

          I don't know, and it seems the InputFormat itself doesn't know either. If we go by the previous implementation then yes, there will only be one table. However, based on the comments on Line 64: `// abstract methods allow for multiple table and scanners in the same job` we have to conclude that there can be different tables.

          I'd be curious what @twalthr thinks about this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2330 I don't know, and it seems the InputFormat itself doesn't know either. If we go by the previous implementation then yes, there will only be one table. However, based on the comments on Line 64: `// abstract methods allow for multiple table and scanners in the same job` we have to conclude that there can be different tables. I'd be curious what @twalthr thinks about this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r73480826

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -237,7 +244,7 @@ private void logSplitInfo(String action, TableInputSplit split) {

          • End key of the region
          • @return true, if this region needs to be included as part of the input (default).
            */
          • private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
            + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
              • End diff –

          This function is according to the documentation intended so people can override it in a subclass. You cannot overrule a static function (and especially not if it is private).
          http://stackoverflow.com/questions/2223386/why-doesnt-java-allow-overriding-of-static-methods

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r73480826 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -237,7 +244,7 @@ private void logSplitInfo(String action, TableInputSplit split) { End key of the region @return true, if this region needs to be included as part of the input (default). */ private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { End diff – This function is according to the documentation intended so people can override it in a subclass. You cannot overrule a static function (and especially not if it is private). http://stackoverflow.com/questions/2223386/why-doesnt-java-allow-overriding-of-static-methods
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r73482199

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java —
          @@ -328,7 +328,11 @@ public void run() {
          synchronized (checkpointLock) {
          LOG.info("Reader terminated, and exiting...");

          • this.format.closeInputFormat();
            + try { + this.format.closeInputFormat(); + }

            catch (IOException e) {
            + // Ignoring

              • End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r73482199 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java — @@ -328,7 +328,11 @@ public void run() { synchronized (checkpointLock) { LOG.info("Reader terminated, and exiting..."); this.format.closeInputFormat(); + try { + this.format.closeInputFormat(); + } catch (IOException e) { + // Ignoring End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          I had another look at the "multiple tables" question. The name of the table comes from the getTableName method that is to be implemented by the subclass. I consider it to be extremely unlikely that multiple calls to that method in a single instance will yield different table names.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 I had another look at the "multiple tables" question. The name of the table comes from the getTableName method that is to be implemented by the subclass. I consider it to be extremely unlikely that multiple calls to that method in a single instance will yield different table names.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          Question: Is this change good?
          Or do you have more things that I need to change before it can be committed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 Question: Is this change good? Or do you have more things that I need to change before it can be committed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          I will add a unit test for this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 I will add a unit test for this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          I did a few serious attempts to create a unit test that fires the HBaseMiniCluster ... and failed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 I did a few serious attempts to create a unit test that fires the HBaseMiniCluster ... and failed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          I managed to resolve the problems with running these unit tests.
          These problems were caused by version conflicts in guava.
          Now we have a HBaseMiniCluster that is started, a table with multiple regions is created. And the TableInputFormat is used to extract the rows again. By setting the paralellism to 1 the same TableInputFormat instance is used for multiple regions and succeeds (the problem this all started with).

          Please review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 I managed to resolve the problems with running these unit tests. These problems were caused by version conflicts in guava. Now we have a HBaseMiniCluster that is started, a table with multiple regions is created. And the TableInputFormat is used to extract the rows again. By setting the paralellism to 1 the same TableInputFormat instance is used for multiple regions and succeeds (the problem this all started with). Please review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on the issue:

          https://github.com/apache/flink/pull/2330

          Current version has a problem in building the shaded jars.
          I runs into an infinite loop in creating the dependency-reduced-pom.xml as described here:

          *Shade Plugin gets stuck in infinite loop building dependency reduced POM* https://issues.apache.org/jira/browse/MSHADE-148
          Although all my versions are newer than the fix described there I still see the problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2330 Current version has a problem in building the shaded jars. I runs into an infinite loop in creating the dependency-reduced-pom.xml as described here: * Shade Plugin gets stuck in infinite loop building dependency reduced POM * https://issues.apache.org/jira/browse/MSHADE-148 Although all my versions are newer than the fix described there I still see the problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79469573

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException {
          }

          @Override

          • public void open(TableInputSplit split) throws IOException {
          • if (split == null) { - throw new IOException("Input split is null!"); - }
          • if (table == null) { - throw new IOException("No HTable provided!"); - }
          • if (scan == null){
          • throw new IOException("No Scan instance provided");
            + public void close() throws IOException {
            + LOG.info("Closing split (scanned {} rows)", scannedRows);
            + this.lastRow = null;
            + try
            Unknown macro: { + if(resultScanner !=null) { + this.resultScanner.close(); + } + }

            finally

            { + this.resultScanner = null; }

            -

          • logSplitInfo("opening", split);
          • scan.setStartRow(split.getStartRow());
          • lastRow = split.getEndRow();
          • scan.setStopRow(lastRow);
            -
          • this.rs = table.getScanner(scan);
          • this.endReached = false;
          • this.scannedRows = 0;
            }

          @Override

          • public void close() throws IOException {
          • if(rs!=null) { - this.rs.close(); - }
          • if(table!=null){
          • this.table.close();
            + public void closeInputFormat() throws IOException {
            + try {
            + if(table!=null) {
              • End diff –

          insert spaces

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79469573 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException { } @Override public void open(TableInputSplit split) throws IOException { if (split == null) { - throw new IOException("Input split is null!"); - } if (table == null) { - throw new IOException("No HTable provided!"); - } if (scan == null){ throw new IOException("No Scan instance provided"); + public void close() throws IOException { + LOG.info("Closing split (scanned {} rows)", scannedRows); + this.lastRow = null; + try Unknown macro: { + if(resultScanner !=null) { + this.resultScanner.close(); + } + } finally { + this.resultScanner = null; } - logSplitInfo("opening", split); scan.setStartRow(split.getStartRow()); lastRow = split.getEndRow(); scan.setStopRow(lastRow); - this.rs = table.getScanner(scan); this.endReached = false; this.scannedRows = 0; } @Override public void close() throws IOException { if(rs!=null) { - this.rs.close(); - } if(table!=null){ this.table.close(); + public void closeInputFormat() throws IOException { + try { + if(table!=null) { End diff – insert spaces
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79469513

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException {
          }

          @Override

          • public void open(TableInputSplit split) throws IOException {
          • if (split == null) { - throw new IOException("Input split is null!"); - }
          • if (table == null) { - throw new IOException("No HTable provided!"); - }
          • if (scan == null){
          • throw new IOException("No Scan instance provided");
            + public void close() throws IOException {
            + LOG.info("Closing split (scanned {} rows)", scannedRows);
            + this.lastRow = null;
            + try {
            + if(resultScanner !=null) {
              • End diff –

          insert space between `!=` and `null`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79469513 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException { } @Override public void open(TableInputSplit split) throws IOException { if (split == null) { - throw new IOException("Input split is null!"); - } if (table == null) { - throw new IOException("No HTable provided!"); - } if (scan == null){ throw new IOException("No Scan instance provided"); + public void close() throws IOException { + LOG.info("Closing split (scanned {} rows)", scannedRows); + this.lastRow = null; + try { + if(resultScanner !=null) { End diff – insert space between `!=` and `null`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79468879

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -67,15 +67,24 @@
          protected abstract T mapResultToTuple(Result r);

          /**

          • * creates a {@link Scan} object and a {@link HTable} connection
            - *
            - * @param parameters
            + * Creates a {@link Scan}

            object and opens the

            {@link HTable}

            connection.
            + * These are opened here because they are needed in the createInputSplits
            + * which is called before the openInputFormat method.
            + * So the connection is opened in

            {@link #configure(Configuration)}

            and closed in

            {@link #closeInputFormat()}

            .
            + * @param parameters The configuration that is to be used

          • @see Configuration
            */
            @Override
            public void configure(Configuration parameters) { - this.table = createTable(); - this.scan = getScanner(); + table = createTable(); + scan = getScanner(); + }

            +
            + /**
            + * Do nothing.
            + */
            + @Override
            + public void openInputFormat() throws IOException {

              • End diff –

          No need to override this method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79468879 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -67,15 +67,24 @@ protected abstract T mapResultToTuple(Result r); /** * creates a {@link Scan} object and a {@link HTable} connection - * - * @param parameters + * Creates a {@link Scan} object and opens the {@link HTable} connection. + * These are opened here because they are needed in the createInputSplits + * which is called before the openInputFormat method. + * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()} . + * @param parameters The configuration that is to be used @see Configuration */ @Override public void configure(Configuration parameters) { - this.table = createTable(); - this.scan = getScanner(); + table = createTable(); + scan = getScanner(); + } + + /** + * Do nothing. + */ + @Override + public void openInputFormat() throws IOException { End diff – No need to override this method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79477333

          — Diff: flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml —
          @@ -1,43 +0,0 @@
          -<?xml version="1.0"?>
          -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
          <!-
          -/**

          • *
          • * Licensed to the Apache Software Foundation (ASF) under one
          • * or more contributor license agreements. See the NOTICE file
          • * distributed with this work for additional information
          • * regarding copyright ownership. The ASF licenses this file
          • * to you under the Apache License, Version 2.0 (the
          • * "License"); you may not use this file except in compliance
          • * with the License. You may obtain a copy of the License at
          • *
          • * http://www.apache.org/licenses/LICENSE-2.0
          • *
          • * Unless required by applicable law or agreed to in writing, software
          • * distributed under the License is distributed on an "AS IS" BASIS,
          • * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          • * See the License for the specific language governing permissions and
          • * limitations under the License.
          • */
            --->
            -<configuration>
              • End diff –

          I'm not so familiar with HBase. Is the config no longer required for HBase 1.1.2 or why did you remove it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79477333 — Diff: flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml — @@ -1,43 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <! - -/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ ---> -<configuration> End diff – I'm not so familiar with HBase. Is the config no longer required for HBase 1.1.2 or why did you remove it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79472506

          — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.addons.hbase;
          +
          +import org.apache.flink.api.common.functions.MapFunction;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.ExecutionEnvironment;
          +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
          +import org.apache.flink.api.java.tuple.Tuple1;
          +import org.apache.hadoop.hbase.TableName;
          +import org.apache.hadoop.hbase.client.HTable;
          +import org.apache.hadoop.hbase.client.Put;
          +import org.apache.hadoop.hbase.client.Result;
          +import org.apache.hadoop.hbase.client.Scan;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertTrue;
          +
          +public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
          + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
          + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
          + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
          +
          + // These are the row ids AND also the values we will put in the test table
          + private static final String[] ROW_IDS =

          {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}

          ;
          +
          + @Before
          + public void createTestTable() throws IOException {
          + TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
          + byte[][] splitKeys =

          {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}

          ;
          + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
          + HTable table = openTable(tableName);
          +
          + for (String rowId : ROW_IDS)

          { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + }

          +
          + table.close();
          + }
          +
          + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
          + @Override
          + protected Scan getScanner()

          { + return new Scan(); + }

          +
          + @Override
          + protected String getTableName()

          { + return TEST_TABLE_NAME; + }

          +
          + @Override
          + protected Tuple1<String> mapResultToTuple(Result r)

          { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + }

          + }
          +
          + @Test
          + public void testTableInputFormat() {
          — End diff –

          Can we make this test a bit more lightweight and not execute a Flink program?
          Instead we could test the interface methods of the InputFormat such as:

          • createInputSplits
          • configure
          • open
          • nextRecord
          • close

          etc.

          if you split the test into several methods, please make sure that HBase is only initalized once with `@BeforeClass`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79472506 — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormat extends HBaseTestingClusterAutostarter { + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); + + // These are the row ids AND also the values we will put in the test table + private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"} ; + + @Before + public void createTestTable() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE_NAME); + byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()} ; + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); + HTable table = openTable(tableName); + + for (String rowId : ROW_IDS) { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + } + + table.close(); + } + + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { + @Override + protected Scan getScanner() { + return new Scan(); + } + + @Override + protected String getTableName() { + return TEST_TABLE_NAME; + } + + @Override + protected Tuple1<String> mapResultToTuple(Result r) { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + } + } + + @Test + public void testTableInputFormat() { — End diff – Can we make this test a bit more lightweight and not execute a Flink program? Instead we could test the interface methods of the InputFormat such as: createInputSplits configure open nextRecord close etc. if you split the test into several methods, please make sure that HBase is only initalized once with `@BeforeClass`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79470876

          — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.addons.hbase;
          +
          +import org.apache.flink.api.common.functions.MapFunction;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.ExecutionEnvironment;
          +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
          +import org.apache.flink.api.java.tuple.Tuple1;
          +import org.apache.hadoop.hbase.TableName;
          +import org.apache.hadoop.hbase.client.HTable;
          +import org.apache.hadoop.hbase.client.Put;
          +import org.apache.hadoop.hbase.client.Result;
          +import org.apache.hadoop.hbase.client.Scan;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertTrue;
          +
          +public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
          — End diff –

          Long running integration tests have to follow the `*ITCase` naming pattern. This will cause them to be executed in Maven's verify phase.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79470876 — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormat extends HBaseTestingClusterAutostarter { — End diff – Long running integration tests have to follow the `*ITCase` naming pattern. This will cause them to be executed in Maven's verify phase.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79469241

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -93,32 +102,45 @@ private HTable createTable() {
          }

          @Override
          + public void open(TableInputSplit split) throws IOException {
          + if (split == null)

          { + throw new IOException("Input split is null!"); + }

          +
          + logSplitInfo("opening", split);
          + scan.setStartRow(split.getStartRow());
          + lastRow = split.getEndRow();
          + scan.setStopRow(lastRow);
          +
          + resultScanner = table.getScanner(scan);
          + endReached = false;
          + scannedRows = 0;
          + }
          +
          + @Override
          public boolean reachedEnd() throws IOException

          { return this.endReached; }

          @Override
          public T nextRecord(T reuse) throws IOException {

          • if (this.rs == null){
            + if (this.resultScanner == null) { throw new IOException("No table result scanner provided!"); }

            try{

              • End diff –

          insert space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79469241 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -93,32 +102,45 @@ private HTable createTable() { } @Override + public void open(TableInputSplit split) throws IOException { + if (split == null) { + throw new IOException("Input split is null!"); + } + + logSplitInfo("opening", split); + scan.setStartRow(split.getStartRow()); + lastRow = split.getEndRow(); + scan.setStopRow(lastRow); + + resultScanner = table.getScanner(scan); + endReached = false; + scannedRows = 0; + } + + @Override public boolean reachedEnd() throws IOException { return this.endReached; } @Override public T nextRecord(T reuse) throws IOException { if (this.rs == null){ + if (this.resultScanner == null) { throw new IOException("No table result scanner provided!"); } try{ End diff – insert space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79477173

          — Diff: flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties —
          @@ -15,9 +15,16 @@

          1. specific language governing permissions and limitations
          2. under the License.

          -log4j.rootLogger=$

          {hadoop.root.logger}

          -hadoop.root.logger=INFO,console
          -log4j.appender.console=org.apache.log4j.ConsoleAppender
          -log4j.appender.console.target=System.err
          -log4j.appender.console.layout=org.apache.log4j.PatternLayout
          -log4j.appender.console.layout.ConversionPattern=%d

          {yy/MM/dd HH:mm:ss}

          %p %c

          {2}

          : %m%n
          +log4j.rootLogger=DEBUG, stdout, file
          +log4j.appender.stdout=org.apache.log4j.ConsoleAppender
          +log4j.appender.stdout.Target=System.out
          +log4j.appender.stdout.threshold=INFO
          +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
          +log4j.appender.stdout.layout.ConversionPattern=%d

          {ABSOLUTE}

          %-5p %30c

          {1}

          :%4L - %m%n
          +## file appender
          — End diff –

          Can you remove the file appender configuration? This creates a 3.5 MB file and make the test heavier. We are suffering from long build times and try to keep new tests as lightweight as possible. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79477173 — Diff: flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties — @@ -15,9 +15,16 @@ specific language governing permissions and limitations under the License. -log4j.rootLogger=$ {hadoop.root.logger} -hadoop.root.logger=INFO,console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d {yy/MM/dd HH:mm:ss} %p %c {2} : %m%n +log4j.rootLogger=DEBUG, stdout, file +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.threshold=INFO +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d {ABSOLUTE} %-5p %30c {1} :%4L - %m%n +## file appender — End diff – Can you remove the file appender configuration? This creates a 3.5 MB file and make the test heavier. We are suffering from long build times and try to keep new tests as lightweight as possible. Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79469197

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -93,32 +102,45 @@ private HTable createTable() {
          }

          @Override
          + public void open(TableInputSplit split) throws IOException {
          + if (split == null)

          { + throw new IOException("Input split is null!"); + }

          +
          + logSplitInfo("opening", split);
          + scan.setStartRow(split.getStartRow());
          + lastRow = split.getEndRow();
          + scan.setStopRow(lastRow);
          +
          + resultScanner = table.getScanner(scan);
          + endReached = false;
          + scannedRows = 0;
          + }
          +
          + @Override
          public boolean reachedEnd() throws IOException

          { return this.endReached; }

          @Override
          public T nextRecord(T reuse) throws IOException {

          • if (this.rs == null){
            + if (this.resultScanner == null){
              • End diff –

          add a space before `{`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79469197 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -93,32 +102,45 @@ private HTable createTable() { } @Override + public void open(TableInputSplit split) throws IOException { + if (split == null) { + throw new IOException("Input split is null!"); + } + + logSplitInfo("opening", split); + scan.setStartRow(split.getStartRow()); + lastRow = split.getEndRow(); + scan.setStopRow(lastRow); + + resultScanner = table.getScanner(scan); + endReached = false; + scannedRows = 0; + } + + @Override public boolean reachedEnd() throws IOException { return this.endReached; } @Override public T nextRecord(T reuse) throws IOException { if (this.rs == null){ + if (this.resultScanner == null){ End diff – add a space before `{`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79791053

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -67,18 +66,23 @@
          protected abstract T mapResultToTuple(Result r);
          — End diff –

          Can we remove the confusing comment `"abstract methods allow for multiple table and scanners in the same job"` and add JavaDocs to all abstract methods that describe what is expected from their implementation?

          A `TableInputFormat` instance should only scan a single table. In case more tables need to be read, each could be read with a separate `TableInputFormat` instance and the output of those could be unioned if needed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79791053 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -67,18 +66,23 @@ protected abstract T mapResultToTuple(Result r); — End diff – Can we remove the confusing comment `"abstract methods allow for multiple table and scanners in the same job"` and add JavaDocs to all abstract methods that describe what is expected from their implementation? A `TableInputFormat` instance should only scan a single table. In case more tables need to be read, each could be read with a separate `TableInputFormat` instance and the output of those could be unioned if needed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79791290

          — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.addons.hbase;
          +
          +import org.apache.flink.api.common.functions.MapFunction;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.ExecutionEnvironment;
          +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
          +import org.apache.flink.api.java.tuple.Tuple1;
          +import org.apache.hadoop.hbase.TableName;
          +import org.apache.hadoop.hbase.client.HTable;
          +import org.apache.hadoop.hbase.client.Put;
          +import org.apache.hadoop.hbase.client.Result;
          +import org.apache.hadoop.hbase.client.Scan;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertTrue;
          +
          +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
          — End diff –

          Please rename to `TableInputFormatITCase`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79791290 — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter { — End diff – Please rename to `TableInputFormatITCase`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79791682

          — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.addons.hbase;
          +
          +import org.apache.flink.api.common.functions.MapFunction;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.ExecutionEnvironment;
          +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
          +import org.apache.flink.api.java.tuple.Tuple1;
          +import org.apache.hadoop.hbase.TableName;
          +import org.apache.hadoop.hbase.client.HTable;
          +import org.apache.hadoop.hbase.client.Put;
          +import org.apache.hadoop.hbase.client.Result;
          +import org.apache.hadoop.hbase.client.Scan;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertTrue;
          +
          +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
          + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
          + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
          + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
          +
          + // These are the row ids AND also the values we will put in the test table
          + private static final String[] ROW_IDS =

          {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}

          ;
          +
          + @Before
          + public void createTestTable() throws IOException {
          + TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
          + byte[][] splitKeys =

          {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}

          ;
          + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
          + HTable table = openTable(tableName);
          +
          + for (String rowId : ROW_IDS)

          { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + }

          +
          + table.close();
          + }
          +
          + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
          + @Override
          + protected Scan getScanner()

          { + return new Scan(); + }

          +
          + @Override
          + protected String getTableName()

          { + return TEST_TABLE_NAME; + }

          +
          + @Override
          + protected Tuple1<String> mapResultToTuple(Result r)

          { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + }

          + }
          +
          + @Test
          + public void testTableInputFormat() {
          + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
          + environment.setParallelism(1);
          +
          + DataSet<String> resultDataSet =
          + environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
          + @Override
          + public String map(Tuple1<String> value) throws Exception

          { + return value.f0; + }

          + });
          +
          + List<String> resultSet = new ArrayList<>();
          + resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
          +
          + try

          { + environment.execute("HBase InputFormat Test"); + }

          catch (Exception e)

          { + Assert.fail("HBase InputFormat test failed. " + e.getMessage()); + }

          +
          + for (String rowId : ROW_IDS) {
          — End diff –

          Please add a check that `ROW_IDS` and `resultSet` have the same size to ensure that each record is read exactly once.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79791682 — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter { + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); + + // These are the row ids AND also the values we will put in the test table + private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"} ; + + @Before + public void createTestTable() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE_NAME); + byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()} ; + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); + HTable table = openTable(tableName); + + for (String rowId : ROW_IDS) { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + } + + table.close(); + } + + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { + @Override + protected Scan getScanner() { + return new Scan(); + } + + @Override + protected String getTableName() { + return TEST_TABLE_NAME; + } + + @Override + protected Tuple1<String> mapResultToTuple(Result r) { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + } + } + + @Test + public void testTableInputFormat() { + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + environment.setParallelism(1); + + DataSet<String> resultDataSet = + environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() { + @Override + public String map(Tuple1<String> value) throws Exception { + return value.f0; + } + }); + + List<String> resultSet = new ArrayList<>(); + resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet)); + + try { + environment.execute("HBase InputFormat Test"); + } catch (Exception e) { + Assert.fail("HBase InputFormat test failed. " + e.getMessage()); + } + + for (String rowId : ROW_IDS) { — End diff – Please add a check that `ROW_IDS` and `resultSet` have the same size to ensure that each record is read exactly once.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79791990

          — Diff: flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml —
          @@ -1,43 +0,0 @@
          -<?xml version="1.0"?>
          -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
          <!-
          -/**

          • *
          • * Licensed to the Apache Software Foundation (ASF) under one
          • * or more contributor license agreements. See the NOTICE file
          • * distributed with this work for additional information
          • * regarding copyright ownership. The ASF licenses this file
          • * to you under the Apache License, Version 2.0 (the
          • * "License"); you may not use this file except in compliance
          • * with the License. You may obtain a copy of the License at
          • *
          • * http://www.apache.org/licenses/LICENSE-2.0
          • *
          • * Unless required by applicable law or agreed to in writing, software
          • * distributed under the License is distributed on an "AS IS" BASIS,
          • * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          • * See the License for the specific language governing permissions and
          • * limitations under the License.
          • */
            --->
            -<configuration>
              • End diff –

          I see, thanks.
          I think so far this file has been used as a template. Not sure how valuable it is to have.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79791990 — Diff: flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml — @@ -1,43 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <! - -/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ ---> -<configuration> End diff – I see, thanks. I think so far this file has been used as a template. Not sure how valuable it is to have.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79791171

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -67,18 +66,23 @@
          protected abstract T mapResultToTuple(Result r);

          /**

          • * creates a {@link Scan} object and a {@link HTable} connection
            + * Creates a {@link Scan}

            object and opens the

            {@link HTable}

            connection.
            + * These are opened here because they are needed in the createInputSplits
            + * which is called before the openInputFormat method.
            + * So the connection is opened in

            {@link #configure(Configuration)}

            and closed in

            {@link #closeInputFormat()}

            .
            *

          • * @param parameters
            + * @param parameters The configuration that is to be used
          • @see Configuration
            */
            @Override
            public void configure(Configuration parameters) {
          • this.table = createTable();
          • this.scan = getScanner();
            + table = createTable();
            + scan = getScanner();
              • End diff –

          Can you add a check that `table` and `scan` are properly initialized, i.e., `!= null`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79791171 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -67,18 +66,23 @@ protected abstract T mapResultToTuple(Result r); /** * creates a {@link Scan} object and a {@link HTable} connection + * Creates a {@link Scan} object and opens the {@link HTable} connection. + * These are opened here because they are needed in the createInputSplits + * which is called before the openInputFormat method. + * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()} . * * @param parameters + * @param parameters The configuration that is to be used @see Configuration */ @Override public void configure(Configuration parameters) { this.table = createTable(); this.scan = getScanner(); + table = createTable(); + scan = getScanner(); End diff – Can you add a check that `table` and `scan` are properly initialized, i.e., `!= null`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79816123

          — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java —
          @@ -67,18 +66,23 @@
          protected abstract T mapResultToTuple(Result r);

          /**

          • * creates a {@link Scan} object and a {@link HTable} connection
            + * Creates a {@link Scan}

            object and opens the

            {@link HTable}

            connection.
            + * These are opened here because they are needed in the createInputSplits
            + * which is called before the openInputFormat method.
            + * So the connection is opened in

            {@link #configure(Configuration)}

            and closed in

            {@link #closeInputFormat()}

            .
            *

          • * @param parameters
            + * @param parameters The configuration that is to be used
          • @see Configuration
            */
            @Override
            public void configure(Configuration parameters) {
          • this.table = createTable();
          • this.scan = getScanner();
            + table = createTable();
            + scan = getScanner();
              • End diff –

          Done. Yet because 'configure()' doesn't have a way to fail nicely I added those checks to the other methods.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79816123 — Diff: flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java — @@ -67,18 +66,23 @@ protected abstract T mapResultToTuple(Result r); /** * creates a {@link Scan} object and a {@link HTable} connection + * Creates a {@link Scan} object and opens the {@link HTable} connection. + * These are opened here because they are needed in the createInputSplits + * which is called before the openInputFormat method. + * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()} . * * @param parameters + * @param parameters The configuration that is to be used @see Configuration */ @Override public void configure(Configuration parameters) { this.table = createTable(); this.scan = getScanner(); + table = createTable(); + scan = getScanner(); End diff – Done. Yet because 'configure()' doesn't have a way to fail nicely I added those checks to the other methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79816134

          — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.addons.hbase;
          +
          +import org.apache.flink.api.common.functions.MapFunction;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.ExecutionEnvironment;
          +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
          +import org.apache.flink.api.java.tuple.Tuple1;
          +import org.apache.hadoop.hbase.TableName;
          +import org.apache.hadoop.hbase.client.HTable;
          +import org.apache.hadoop.hbase.client.Put;
          +import org.apache.hadoop.hbase.client.Result;
          +import org.apache.hadoop.hbase.client.Scan;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertTrue;
          +
          +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
          — End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79816134 — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter { — End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nielsbasjes commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2330#discussion_r79816145

          — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.addons.hbase;
          +
          +import org.apache.flink.api.common.functions.MapFunction;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.ExecutionEnvironment;
          +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
          +import org.apache.flink.api.java.tuple.Tuple1;
          +import org.apache.hadoop.hbase.TableName;
          +import org.apache.hadoop.hbase.client.HTable;
          +import org.apache.hadoop.hbase.client.Put;
          +import org.apache.hadoop.hbase.client.Result;
          +import org.apache.hadoop.hbase.client.Scan;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertTrue;
          +
          +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter {
          + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable";
          + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
          + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
          +
          + // These are the row ids AND also the values we will put in the test table
          + private static final String[] ROW_IDS =

          {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"}

          ;
          +
          + @Before
          + public void createTestTable() throws IOException {
          + TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
          + byte[][] splitKeys =

          {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()}

          ;
          + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
          + HTable table = openTable(tableName);
          +
          + for (String rowId : ROW_IDS)

          { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + }

          +
          + table.close();
          + }
          +
          + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
          + @Override
          + protected Scan getScanner()

          { + return new Scan(); + }

          +
          + @Override
          + protected String getTableName()

          { + return TEST_TABLE_NAME; + }

          +
          + @Override
          + protected Tuple1<String> mapResultToTuple(Result r)

          { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + }

          + }
          +
          + @Test
          + public void testTableInputFormat() {
          + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
          + environment.setParallelism(1);
          +
          + DataSet<String> resultDataSet =
          + environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
          + @Override
          + public String map(Tuple1<String> value) throws Exception

          { + return value.f0; + }

          + });
          +
          + List<String> resultSet = new ArrayList<>();
          + resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet));
          +
          + try

          { + environment.execute("HBase InputFormat Test"); + }

          catch (Exception e)

          { + Assert.fail("HBase InputFormat test failed. " + e.getMessage()); + }

          +
          + for (String rowId : ROW_IDS) {
          — End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user nielsbasjes commented on a diff in the pull request: https://github.com/apache/flink/pull/2330#discussion_r79816145 — Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestTableInputFormatITCase extends HBaseTestingClusterAutostarter { + private static final String TEST_TABLE_NAME = "TableInputFormatTestTable"; + private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes(); + private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes(); + + // These are the row ids AND also the values we will put in the test table + private static final String[] ROW_IDS = {"000", "111", "222", "333", "444", "555", "666", "777", "888", "999"} ; + + @Before + public void createTestTable() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE_NAME); + byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()} ; + createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys); + HTable table = openTable(tableName); + + for (String rowId : ROW_IDS) { + byte[] rowIdBytes = rowId.getBytes(); + Put p = new Put(rowIdBytes); + // Use the rowId as the value to facilitate the testing better + p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, rowIdBytes); + table.put(p); + } + + table.close(); + } + + class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> { + @Override + protected Scan getScanner() { + return new Scan(); + } + + @Override + protected String getTableName() { + return TEST_TABLE_NAME; + } + + @Override + protected Tuple1<String> mapResultToTuple(Result r) { + return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME))); + } + } + + @Test + public void testTableInputFormat() { + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + environment.setParallelism(1); + + DataSet<String> resultDataSet = + environment.createInput(new InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() { + @Override + public String map(Tuple1<String> value) throws Exception { + return value.f0; + } + }); + + List<String> resultSet = new ArrayList<>(); + resultDataSet.output(new LocalCollectionOutputFormat<>(resultSet)); + + try { + environment.execute("HBase InputFormat Test"); + } catch (Exception e) { + Assert.fail("HBase InputFormat test failed. " + e.getMessage()); + } + + for (String rowId : ROW_IDS) { — End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2330

          Hi @nielsbasjes, thanks for fixing and cleaning up the `TableInputFormat`.
          This PR is good to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2330 Hi @nielsbasjes, thanks for fixing and cleaning up the `TableInputFormat`. This PR is good to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2330

          Hi @nielsbasjes, I just posted to the dev mailinglist and proposed to update the HBase dependency to 1.2.3 (as FLINK-2765(https://issues.apache.org/jira/browse/FLINK-2765) suggests). By the end of the week we have a decision and I will merge this PR to the master branch.

          In the meantime, I will merge the fixed TableInputFormat changes to the Flink 1.1 branch and revert all breaking changes (pom.xml, RichInputFormat, hbase-site.xml, tests, ...).
          For Flink 1.2.0 we want these changes.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2330 Hi @nielsbasjes, I just posted to the dev mailinglist and proposed to update the HBase dependency to 1.2.3 (as FLINK-2765 ( https://issues.apache.org/jira/browse/FLINK-2765 ) suggests). By the end of the week we have a decision and I will merge this PR to the master branch. In the meantime, I will merge the fixed TableInputFormat changes to the Flink 1.1 branch and revert all breaking changes (pom.xml, RichInputFormat, hbase-site.xml, tests, ...). For Flink 1.2.0 we want these changes. Thanks, Fabian
          Hide
          fhueske Fabian Hueske added a comment -

          Merged for Flink 1.1 as 98b399d4b4ddc9ab5d01e40dcb9ab0889f0d1067

          Show
          fhueske Fabian Hueske added a comment - Merged for Flink 1.1 as 98b399d4b4ddc9ab5d01e40dcb9ab0889f0d1067
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2330

          I noticed building this PR for hadoop1 (`mvn clean install -Dhadoop.profile=1`) fails:

          > The following artifacts could not be resolved: org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1, org.apache.hbase:hbase-hadoop2-compat:jar:tests:0.98.11-hadoop1: Could not find artifact org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1 in central

          I'm not a Maven guru. Is it possible to disable compiling and executing the tests for the hadoop1 profile?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2330 I noticed building this PR for hadoop1 (`mvn clean install -Dhadoop.profile=1`) fails: > The following artifacts could not be resolved: org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1, org.apache.hbase:hbase-hadoop2-compat:jar:tests:0.98.11-hadoop1: Could not find artifact org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1 in central I'm not a Maven guru. Is it possible to disable compiling and executing the tests for the hadoop1 profile?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2330

          I'll propose to drop the hadoop1 builds on the dev ML.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2330 I'll propose to drop the hadoop1 builds on the dev ML.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2330

          I disabled tests for the hadoop1 profile.
          Will build the PR one more time and merge if everything passes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2330 I disabled tests for the hadoop1 profile. Will build the PR one more time and merge if everything passes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2330

          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2330 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2330

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2330
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.2.0 with 3f8727921e944d1d89714f5885c2de63681d51b2

          Thanks for the fix Niels Basjes!

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.2.0 with 3f8727921e944d1d89714f5885c2de63681d51b2 Thanks for the fix Niels Basjes !

            People

            • Assignee:
              nielsbasjes Niels Basjes
              Reporter:
              nielsbasjes Niels Basjes
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development