From f10363b3166cc17b05993ad356abfc87068b97e0 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Thu, 20 Sep 2012 19:46:54 +0000 Subject: [PATCH 2/2] [jira] [HBASE-6732] [89-fb] Reduce scope of synchronized block in HBaseClient.Connection#sendParam Author: michalgr Summary: sendParam method synchronizes on out stream (so that only one thread writes to socket). Right now it prepares (eg. compress) message under this lock as well. Lock should be taken only for sending. Test Plan: Run unit tests in map reduce. Some tests failed. Some of them failed when repeated on my computer (TestLogSplitOnMasterFailover, TestDistributedLogSplitting). These tests failed when tested against trunk as well. Reviewers: mbautin, kranganathan Reviewed By: kranganathan CC: Karthik, stack, Kannan, Liyin Differential Revision: https://reviews.facebook.net/D5259 git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/0.89-fb@1388181 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/ipc/HBaseClient.java | 83 ++++++++++---------- 1 files changed, 41 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 90ea2b4..7d23f0e 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -510,55 +510,54 @@ public class HBaseClient { DataOutputStream outOS = null; Compressor compressor = null; try { - //noinspection SynchronizeOnNonFinalField - synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - if (LOG.isDebugEnabled()) - LOG.debug(getName() + " sending #" + call.id); + if (LOG.isDebugEnabled()) + LOG.debug(getName() + " sending #" + call.id); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - uncompressedOS = new DataOutputStream(baos); - outOS = uncompressedOS; - try { - // 1. write the call id uncompressed - uncompressedOS.writeInt(call.id); - // 2. write RPC options uncompressed - if (call.version >= HBaseServer.VERSION_RPCOPTIONS) { - call.options.write(outOS); - } - // preserve backwards compatibility - if (call.options.getTxCompression() != Compression.Algorithm.NONE) { - // 3. setup the compressor - compressor = call.options.getTxCompression().getCompressor(); - OutputStream compressedOutputStream = + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + uncompressedOS = new DataOutputStream(baos); + outOS = uncompressedOS; + try { + // 1. write the call id uncompressed + uncompressedOS.writeInt(call.id); + // 2. write RPC options uncompressed + if (call.version >= HBaseServer.VERSION_RPCOPTIONS) { + call.options.write(outOS); + } + // preserve backwards compatibility + if (call.options.getTxCompression() != Compression.Algorithm.NONE) { + // 3. setup the compressor + compressor = call.options.getTxCompression().getCompressor(); + OutputStream compressedOutputStream = call.options.getTxCompression().createCompressionStream( - uncompressedOS, compressor, 0); - outOS = new DataOutputStream(compressedOutputStream); - } - // 4. write the output params with the correct compression type - call.param.write(outOS); - outOS.flush(); - baos.flush(); - call.startTime = System.currentTimeMillis(); - } catch (IOException e) { - LOG.error("Failed to prepare request in in-mem buffers!", e); - markClosed(e); + uncompressedOS, compressor, 0); + outOS = new DataOutputStream(compressedOutputStream); } - byte[] data = baos.toByteArray(); - int dataLength = data.length; - try { + // 4. write the output params with the correct compression type + call.param.write(outOS); + outOS.flush(); + baos.flush(); + call.startTime = System.currentTimeMillis(); + } catch (IOException e) { + LOG.error("Failed to prepare request in in-mem buffers!", e); + markClosed(e); + } + byte[] data = baos.toByteArray(); + int dataLength = data.length; + try { + synchronized (this.out) { out.writeInt(dataLength); //first put the data length writeToSocket(out, data, 0, dataLength); out.flush(); - } catch (IOException e) { - // It is not easy to get an exception here. - // The read is what always fails. Write gets accepted into - // the socket buffer. If the connection is already dead, even - // then read gets called first and fails first. - IOException rewrittenException = - new SyncFailedException("Failed to write to peer"); - rewrittenException.initCause(e); - markClosed(rewrittenException); } + } catch (IOException e) { + // It is not easy to get an exception here. + // The read is what always fails. Write gets accepted into + // the socket buffer. If the connection is already dead, even + // then read gets called first and fails first. + IOException rewrittenException = + new SyncFailedException("Failed to write to peer"); + rewrittenException.initCause(e); + markClosed(rewrittenException); } } finally { //the buffer is just an in-memory buffer, but it is still polite to -- 1.7.0.4