Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17544

Timeout waiting for connection from pool, DataFrame Reader's not closing S3 connections?

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.0.0
    • None
    • Input/Output
    • Amazon EMR, S3, Scala

    Description

      I have an application that loops through a text file to find files in S3 and then reads them in, performs some ETL processes, and then writes them out. This works for around 80 loops until I get this:

      16/09/14 18:58:23 INFO S3NativeFileSystem: Opening 's3://webpt-emr-us-west-2-hadoop/edw/master/fdbk_ideavote/20160907/part-m-00000.avro' for reading
      16/09/14 18:59:13 INFO AmazonHttpClient: Unable to execute HTTP request: Timeout waiting for connection from pool
      com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
      	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
      	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
      	at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.getConnection(Unknown Source)
      	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
      	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
      	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
      	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
      	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991)
      	at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212)
      	at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      	at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
      	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780)
      	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428)
      	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313)
      	at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:289)
      	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:324)
      	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:452)
      	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:458)
      	at scala.util.Try$.apply(Try.scala:192)
      	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:451)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
      	at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26)
      	at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26)
      	at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:116)
      	at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:105)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      	at com.webpt.SparkMaxwell$.main(SparkMaxwell.scala:105)
      	at com.webpt.SparkMaxwell.main(SparkMaxwell.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
      	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
      	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
      	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
      	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  
      

      I don't use Amazon's API, I'm only use DataFrameReader's and DataFrameWriters, and I suspect that previous connections aren't being closed and so after a fixed amount of iterations the connection pool is exhausted

      Attachments

        Activity

          People

            Unassigned Unassigned
            Bradyta Brady Auen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: