Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4311

TableInputFormat fails when reused on next split

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.0.3
    • 1.1.3, 1.2.0
    • None
    • None

    Description

      We have written a batch job that uses data from HBase by means of using the TableInputFormat.

      We have found that this class sometimes fails with this exception:

      java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
      at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
      at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
      at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
      at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
      at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
      at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
      at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
      at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
      at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
      at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
      at org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
      at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
      at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
      at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
      at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
      ... 10 more

      As you can see the ThreadPoolExecutor was terminated at this point.

      We tracked it down to the fact that

      1. the configure method opens the table
      2. the open method obtains the result scanner
      3. the closes method closes the table.

      If a second split arrives on the same instance then the open method will fail because the table has already been closed.

      We also found that this error varies with the versions of HBase that are used. I have also seen this exception:

      Caused by: java.io.IOException: hconnection-0x19d37183 closed
      at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
      at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
      ... 37 more

      I found that in the documentation of the InputFormat interface is clearly states

      IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That is due to the fact that the input format is used for potentially multiple splits. After a split is done, the format's close function is invoked and, if another split is available, the open function is invoked afterwards for the next split.

      It appears that this specific InputFormat has not been checked against this constraint.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            nielsbasjes Niels Basjes
            nielsbasjes Niels Basjes
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment