class CvHttpClientImpl( val cfg: CvHttpClientImplConfiguration, val probes: Probes ) extends CvHttpClient with DisposableBean with Probing { import CvHttpClientImpl._ def execute(httpRequest: HttpRequest): Result = { val httpUriRequest = { val url = cfg.urlRoot.toString + httpRequest.url httpRequest match { case get: Get => { new HttpGet(url) } case post: Post => { val httpPost = new HttpPost(url) for (body <-post.body) httpPost.setEntity(new StringEntity(body, post.mimeType.orNull, null)) httpPost } case put: Put => { val httpPut = new HttpPut(url) for (body <- put.body) httpPut.setEntity(new StringEntity(body, put.mimeType.orNull, null)) httpPut } case delete: Delete => { new HttpDelete(url) } } } httpRequest match { case httpRequestHeaders: Headers => for ((k, v) <- httpRequestHeaders.headers) httpUriRequest.addHeader(k, v) case _ => } // configure preemptive BASIC authentication val localContext = { val localContext = new BasicHttpContext if (cfg.preemptiveAuthentication) localContext.setAttribute(ClientContext.AUTH_CACHE, { val basicAuthCache = new BasicAuthCache basicAuthCache.put(new HttpHost(httpUriRequest.getURI.getHost, httpUriRequest.getURI.getPort, httpUriRequest.getURI.getScheme), new BasicScheme) basicAuthCache }) localContext } time("cvhttpclient-%s-%s".format(cfg.httpClientName, httpUriRequest.getMethod).toLowerCase) { try { trace("[%s] Executing HTTP %s %s".format(cfg.httpClientName, httpUriRequest.getMethod, httpUriRequest.getURI)) httpUriRequest.setParams(httpParams) httpClient.execute(httpUriRequest, responseHandler, localContext) } catch { case ex: Exception => { warn("[%s] Executing HTTP %s %s failed with exception".format(cfg.httpClientName, httpUriRequest.getMethod, httpUriRequest.getURI), ex) ExceptionResult(ex) } } } } private val responseHandler = new ResponseHandler[HttpResponse] { def handleResponse(response: org.apache.http.HttpResponse): HttpResponse = { val statusLine = response.getStatusLine val body = EntityUtils.toString(response.getEntity) val headers = Map((for (h <- response.getAllHeaders) yield { h.getName -> h.getValue }).toArray: _*) HttpResponse(statusLine.getStatusCode, statusLine.getReasonPhrase, body, headers) } } cfg.validate private val clientConnectionManager = { val clientConnectionManager = new PoolingHttpClientConnectionManager clientConnectionManager.setMaxTotal(cfg.connectionPoolSize) clientConnectionManager.setDefaultMaxPerRoute(cfg.connectionPoolSize) clientConnectionManager } private val httpParams = { import org.apache.http.params.CoreConnectionPNames._ import org.apache.http.params.CoreProtocolPNames._ val httpParams = new BasicHttpParams httpParams.setParameter(USER_AGENT, "CLEARVISION HTTP CLIENT [%s]".format(cfg.httpClientName)) httpParams.setParameter(SO_TIMEOUT, cfg.socketTimeout) httpParams.setParameter(CONNECTION_TIMEOUT, cfg.connectionTimeout) DefaultHttpClient.setDefaultHttpParams(httpParams) httpParams } private val httpRequestRetryHandler = { new DefaultHttpRequestRetryHandler(1, false) } private val basicCredentialsProvider = { val basicCredentialsProvider = new BasicCredentialsProvider basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(cfg.username, cfg.password)) basicCredentialsProvider } private val httpClient = { info("[%s] Start".format(cfg.httpClientName)) val client: CloseableHttpClient = HttpClients.custom() .setConnectionManager(clientConnectionManager) .setRetryHandler(httpRequestRetryHandler) .setDefaultCredentialsProvider(basicCredentialsProvider) .evictExpiredConnections() .evictIdleConnections(5, TimeUnit.SECONDS) .build() client } def destroy { info("[%s] Shutdown".format(cfg.httpClientName)) clientConnectionManager.shutdown } } object CvHttpClientImpl extends LazyLogging { val logger = Logger.getLogger(classOf[CvHttpClientImpl]) }