diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java new file mode 100644 index 0000000..35ba064 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -0,0 +1,48 @@ +package org.apache.kafka.clients; + +import org.apache.kafka.common.requests.RequestSend; + +/** + * An request that hasn't been fully processed yet + */ +public final class ClientRequest { + private final long createdMs; + private final boolean expectResponse; + private final RequestSend request; + private final Object attachment; + + /** + * @param createdMs The unix timestamp in milliseonds for the time at which this request was created. + * @param expectResponse Should we expect a response message or is this request complete once it is sent? + * @param request The request + * @param attachment Associated data with the request + */ + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, Object attachment) { + this.createdMs = createdMs; + this.attachment = attachment; + this.request = request; + this.expectResponse = expectResponse; + } + + @Override + public String toString() { + return "InFlightRequest(expectResponse=" + expectResponse + ", payload=" + attachment + ", request=" + request + ")"; + } + + public boolean expectResponse() { + return expectResponse; + } + + public RequestSend request() { + return request; + } + + public Object attachment() { + return attachment; + } + + public long createdTime() { + return createdMs; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java new file mode 100644 index 0000000..3d082ba --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java @@ -0,0 +1,44 @@ +package org.apache.kafka.clients; + +import org.apache.kafka.common.protocol.types.Struct; + +public class ClientResponse { + + private final long received; + private final boolean disconnected; + private final ClientRequest request; + private final Struct responseBody; + + public ClientResponse(ClientRequest request, long received, boolean success, Struct responseBody) { + super(); + this.received = received; + this.disconnected = success; + this.request = request; + this.responseBody = responseBody; + } + + public long receivedTime() { + return received; + } + + public boolean wasDisconnected() { + return disconnected; + } + + public ClientRequest request() { + return request; + } + + public Struct responseBody() { + return responseBody; + } + + public boolean hasResponse() { + return responseBody != null; + } + + public long requestLatencyMs() { + return receivedTime() - this.request.createdTime(); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java new file mode 100644 index 0000000..dfa1fab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -0,0 +1,88 @@ +package org.apache.kafka.clients; + +import java.util.HashMap; +import java.util.Map; + +/** + * The state of our connection to each node in the cluster. + * + */ +final class ClusterConnectionStates { + private final long reconnectBackoffMs; + private final Map nodeState; + + public ClusterConnectionStates(long reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + this.nodeState = new HashMap(); + } + + /** + * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not + * connected and haven't been connected for at least the minimum reconnection backoff period. + * @param node The node id to check + * @param nowMs The current time in MS + * @return true if we can initiate a new connection + */ + public boolean canConnect(int node, long nowMs) { + NodeConnectionState state = nodeState.get(node); + if (state == null) + return true; + else + return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs; + } + + /** + * Enter the connecting state for the given node. + * @param node The id of the node we are connecting to + * @param nowMs The current time. + */ + public void connecting(int node, long nowMs) { + nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, nowMs)); + } + + /** + * Return true iff we have a connection to the give node + * @param node The id of the node to check + */ + public boolean isConnected(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTED; + } + + /** + * Return true iff we are in the process of connecting to the given node + * @param node The id of the node + */ + public boolean isConnecting(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTING; + } + + /** + * Enter the connected state for the given node + * @param node The node we have connected to + */ + public void connected(int node) { + nodeState(node).state = ConnectionState.CONNECTED; + } + + /** + * Enter the disconnected state for the given node + * @param node The node we have disconnected from + */ + public void disconnected(int node) { + nodeState(node).state = ConnectionState.DISCONNECTED; + } + + /** + * Get the state of our connection to the given state + * @param node The id of the node + * @return The state of our connection + */ + private NodeConnectionState nodeState(int node) { + NodeConnectionState state = this.nodeState.get(node); + if (state == null) + throw new IllegalStateException("No entry found for node " + node); + return state; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java new file mode 100644 index 0000000..a0c327b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java @@ -0,0 +1,8 @@ +package org.apache.kafka.clients; + +/** + * The states of a node connection + */ +enum ConnectionState { + DISCONNECTED, CONNECTING, CONNECTED +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java new file mode 100644 index 0000000..0bcb0a5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -0,0 +1,92 @@ +package org.apache.kafka.clients; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * The set of requests which have been sent or are being sent but haven't yet received a response + */ +final class InFlightRequests { + + private final Map> requests = new HashMap>(); + + /** + * Add the given request to the queue for the node it was directed to + */ + public void add(ClientRequest request) { + Deque reqs = this.requests.get(request.request().destination()); + if (reqs == null) { + reqs = new ArrayDeque(); + this.requests.put(request.request().destination(), reqs); + } + reqs.addFirst(request); + } + + /** + * Get the request queue for the given node + */ + private Deque requestQueue(int node) { + Deque reqs = requests.get(node); + if (reqs == null || reqs.isEmpty()) + throw new IllegalStateException("Response from server for which there are no in-flight requests."); + return reqs; + } + + /** + * Get the oldest request (the one that that will be completed next) for the given node + */ + public ClientRequest completeNext(int node) { + return requestQueue(node).pollLast(); + } + + /** + * Get the last request we sent to the given node (but don't remove it from the queue) + * @param node The node id + */ + public ClientRequest lastSent(int node) { + return requestQueue(node).peekFirst(); + } + + public ClientRequest completeLastSent(int node) { + return requestQueue(node).pollFirst(); + } + + /** + * Can we send more requests to this node? + * + * @param node Node in question + * @return true iff we have no requests still being sent to the given node + */ + public boolean canSendMore(int node) { + Deque queue = requests.get(node); + return queue == null || queue.isEmpty() || queue.peekFirst().request().complete(); + } + + /** + * Clear out all the in-flight requests for the given node and return them + * + * @param node The node + * @return All the in-flight requests for that node that have been removed + */ + public Iterable clearAll(int node) { + Deque reqs = requests.get(node); + if (reqs == null) { + return Collections.emptyList(); + } else { + return requests.remove(node); + } + } + + /** + * Count all in-flight requests + */ + public int totalInFlightRequests() { + int total = 0; + for (Deque deque : this.requests.values()) + total += deque.size(); + return total; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java new file mode 100644 index 0000000..5e0a81e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -0,0 +1,355 @@ +package org.apache.kafka.clients; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A network client for asynchronous request/response network i/o. This is an internal class used to implement the + * user-facing producer and consumer clients. + *

+ * This class is not thread-safe + */ +public class NetworkClient { + + private static final Logger log = LoggerFactory.getLogger(NetworkClient.class); + + /* the selector used to perform network i/o */ + private final Selectable selector; + + /* the current cluster metadata */ + private final Metadata metadata; + + /* the state of each nodes connection */ + private final ClusterConnectionStates connectionStates; + + /* the set of requests currently being sent or awaiting a response */ + private final InFlightRequests inFlightRequests; + + /* the socket send buffer size in bytes */ + private final int socketSendBuffer; + + /* the socket receive size buffer in bytes */ + private final int socketReceiveBuffer; + + /* the client id used to identify this client in requests to the server */ + private final String clientId; + + /* the current correlation id to use when sending requests to servers */ + private int correlation; + + /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ + private int metadataFetchNodeIndex; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ + private boolean metadataFetchInProgress; + + public NetworkClient(Selectable selector, + Metadata metadata, + String clientId, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer) { + this.selector = selector; + this.metadata = metadata; + this.clientId = clientId; + this.inFlightRequests = new InFlightRequests(); + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs); + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; + this.correlation = 0; + this.metadataFetchNodeIndex = 0; + this.metadataFetchInProgress = false; + } + + /** + * Begin connecting to the given node, return true if we are already connected and ready to send to that node. + * @param node The node to check + * @param now The current timestamp + * @return True if we are ready to send to the given node + */ + public boolean ready(Node node, long now) { + if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + return true; + } else if (connectionStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); + } + return false; + } + + /** + * Initiate the given requests and check for any new responses, waiting up to the specified time. Requests can only + * be sent for ready nodes. + * @param requests The requests to initiate + * @param timeoutMs The maximum amount of time to wait for responses if there are none immediately + * @param now The current time in milliseconds + * @return The list of responses received + */ + public List poll(List requests, long timeoutMs, long now) { + Cluster cluster = metadata.fetch(); + + // should we update our metadata? + List sends = new ArrayList(); + maybeUpdateMetadata(cluster, sends, now); + + for (int i = 0; i < requests.size(); i++) { + ClientRequest request = requests.get(i); + this.inFlightRequests.add(request); + sends.add(request.request()); + } + + // do the I/O + try { + this.selector.poll(timeoutMs, sends); + } catch (IOException e) { + log.error("Unexpected error during I/O in producer network thread", e); + } + + List responses = new ArrayList(); + handleCompletedSends(responses, now); + handleCompletedReceives(responses, now); + handleDisconnections(responses, now); + handleConnections(); + + return responses; + } + + /** + * Get the number of in-flight requests + */ + public int inFlightRequests() { + return this.inFlightRequests.totalInFlightRequests(); + } + + /** + * Generate a request header for the given API key + * @param key The api key + * @return A request header with the appropriate client id and correlation id + */ + public RequestHeader nextRequestHeader(ApiKeys key) { + return new RequestHeader(key.id, clientId, correlation++); + } + + /** + * Interrupt the client if it is blocked waiting on I/O. + */ + public void wakeup() { + this.selector.wakeup(); + } + + /** + * Close the network client + */ + public void close() { + this.selector.close(); + } + + /** + * Handle any completed request send. In particular if no response is expected consider the request complete. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedSends(List responses, long now) { + // if no response is expected then when the send is completed, return it + for (NetworkSend send : this.selector.completedSends()) { + ClientRequest request = this.inFlightRequests.lastSent(send.destination()); + if (!request.expectResponse()) { + this.inFlightRequests.completeLastSent(send.destination()); + responses.add(new ClientResponse(request, now, false, null)); + } + } + } + + /** + * Handle any completed receives and update the response list with the responses received. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedReceives(List responses, long now) { + for (NetworkReceive receive : this.selector.completedReceives()) { + int source = receive.source(); + ClientRequest req = inFlightRequests.completeNext(source); + ResponseHeader header = ResponseHeader.parse(receive.payload()); + short apiKey = req.request().header().apiKey(); + Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); + correlate(req.request().header(), header); + if (apiKey == ApiKeys.METADATA.id) { + handleMetadataResponse(req.request().header(), body, now); + } else { + // need to add body/header to response here + responses.add(new ClientResponse(req, now, false, body)); + } + } + } + + private void handleMetadataResponse(RequestHeader header, Struct body, long now) { + this.metadataFetchInProgress = false; + MetadataResponse response = new MetadataResponse(body); + Cluster cluster = response.cluster(); + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being + // created which means we will get errors and no nodes until it exists + if (cluster.nodes().size() > 0) + this.metadata.update(cluster, now); + else + log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + } + + /** + * Handle any disconnected connections + * @param responses The list of responses that completed with the disconnection + * @param now The current time + */ + private void handleDisconnections(List responses, long now) { + for (int node : this.selector.disconnected()) { + connectionStates.disconnected(node); + log.debug("Node {} disconnected.", node); + for (ClientRequest request : this.inFlightRequests.clearAll(node)) { + log.trace("Cancelled request {} due to node {} being disconnected", request, node); + ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); + if (requestKey == ApiKeys.METADATA) + metadataFetchInProgress = false; + responses.add(new ClientResponse(request, now, true, null)); + } + } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (this.selector.disconnected().size() > 0) + this.metadata.forceUpdate(); + } + + /** + * Record any newly completed connections + */ + private void handleConnections() { + for (Integer id : this.selector.connected()) { + log.debug("Completed connection to node {}", id); + this.connectionStates.connected(id); + } + } + + /** + * Validate that the response corresponds to the request we expect or else explode + */ + private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { + if (requestHeader.correlationId() != responseHeader.correlationId()) + throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + + ") does not match request (" + + requestHeader.correlationId() + + ")"); + } + + /** + * Create a metadata request for the given topics + */ + private ClientRequest metadataRequest(long nowMs, int node, Set topics) { + MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); + return new ClientRequest(nowMs, true, send, null); + } + + /** + * Add a metadata request to the list of sends if we need to make one + */ + private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { + if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) + return; + + Node node = selectMetadataDestination(cluster); + if (node == null) + return; + + if (connectionStates.isConnected(node.id())) { + Set topics = metadata.topics(); + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(nowMs, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + sends.add(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } else if (connectionStates.canConnect(node.id(), nowMs)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, nowMs); + } + } + + /** + * Find a good node to make a metadata request to. This method will first look for a node that has an existing + * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding + * requests. + * @return A node with no requests currently being sent or null if no such node exists + */ + private Node selectMetadataDestination(Cluster cluster) { + List nodes = cluster.nodes(); + + // first look for a node to which we are connected and have no outstanding requests + boolean connectionInProgress = false; + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (connectionStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); + return node; + } else if (connectionStates.isConnecting(node.id())) { + connectionInProgress = true; + } + } + + // if we have a connection that is being established now, just wait for that don't make another + if (connectionInProgress) + return null; + + // okay, no luck, pick a random unused node + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (this.inFlightRequests.canSendMore(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); + return node; + } + } + + return null; // we failed to find a good destination + } + + /** + * Get the index in the node list of the node to use for the metadata request + */ + private int metadataNodeIndex(int offset, int size) { + return Utils.abs(offset + this.metadataFetchNodeIndex) % size; + } + + /** + * Initiate a connection to the given node + */ + private void initiateConnect(Node node, long nowMs) { + try { + log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + this.connectionStates.connecting(node.id(), nowMs); + } catch (IOException e) { + /* attempt failed, we'll try again after the backoff */ + connectionStates.disconnected(node.id()); + /* maybe the problem is our metadata, update it */ + metadata.forceUpdate(); + log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java new file mode 100644 index 0000000..3eda043 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java @@ -0,0 +1,19 @@ +package org.apache.kafka.clients; + +/** + * The state of our connection to a node + */ +final class NodeConnectionState { + + ConnectionState state; + long lastConnectAttemptMs; + + public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { + this.state = state; + this.lastConnectAttemptMs = lastConnectAttempt; + } + + public String toString() { + return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f1def50..7c2bbef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -120,17 +121,20 @@ public class KafkaProducer implements Producer { time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - this.sender = new Sender(new Selector(this.metrics, time), + + NetworkClient client = new NetworkClient(new Selector(this.metrics, time), + this.metadata, + clientId, + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.sender = new Sender(client, this.metadata, this.accumulator, - clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), config.getInt(ProducerConfig.TIMEOUT_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 3e83ae0..6d74c7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -12,11 +12,16 @@ */ package org.apache.kafka.clients.producer.internals; -import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.*; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -29,19 +34,12 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; -import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.MetadataRequest; -import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.ProduceResponse; -import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; -import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -56,16 +54,13 @@ public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); /* the state of each nodes connection */ - private final NodeStates nodeStates; + private final NetworkClient client; /* the record accumulator that batches records */ private final RecordAccumulator accumulator; - /* the selector used to perform network i/o */ - private final Selectable selector; - - /* the client id used to identify this client in requests to the server */ - private final String clientId; + /* the metadata for the client */ + private final Metadata metadata; /* the maximum request size to attempt to send to the server */ private final int maxRequestSize; @@ -79,66 +74,33 @@ public class Sender implements Runnable { /* the number of times to retry a failed request before giving up */ private final int retries; - /* the socket send buffer size in bytes */ - private final int socketSendBuffer; - - /* the socket receive size buffer in bytes */ - private final int socketReceiveBuffer; - - /* the set of currently in-flight requests awaiting a response from the server */ - private final InFlightRequests inFlightRequests; - - /* a reference to the current Cluster instance */ - private final Metadata metadata; - /* the clock instance used for getting the time */ private final Time time; - /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ - private int metadataFetchNodeIndex; - - /* the current correlation id to use when sending requests to servers */ - private int correlation; - - /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ - private boolean metadataFetchInProgress; - /* true while the sender thread is still running */ private volatile boolean running; /* metrics */ private final SenderMetrics sensors; - public Sender(Selectable selector, + public Sender(NetworkClient client, Metadata metadata, RecordAccumulator accumulator, - String clientId, int maxRequestSize, - long reconnectBackoffMs, short acks, int retries, int requestTimeout, - int socketSendBuffer, - int socketReceiveBuffer, Metrics metrics, Time time) { - this.nodeStates = new NodeStates(reconnectBackoffMs); + this.client = client; this.accumulator = accumulator; - this.selector = selector; - this.maxRequestSize = maxRequestSize; this.metadata = metadata; - this.clientId = clientId; + this.maxRequestSize = maxRequestSize; this.running = true; this.requestTimeout = requestTimeout; this.acks = acks; this.retries = retries; - this.socketSendBuffer = socketSendBuffer; - this.socketReceiveBuffer = socketReceiveBuffer; - this.inFlightRequests = new InFlightRequests(); - this.correlation = 0; - this.metadataFetchInProgress = false; this.time = time; - this.metadataFetchNodeIndex = new Random().nextInt(); this.sensors = new SenderMetrics(metrics); } @@ -168,10 +130,7 @@ public class Sender implements Runnable { } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } - } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0); - - // close all the connections - this.selector.close(); + } while (this.accumulator.hasUnsent() || this.client.inFlightRequests() > 0); log.debug("Shutdown of Kafka producer I/O thread has completed."); } @@ -181,115 +140,34 @@ public class Sender implements Runnable { * * @param nowMs The current POSIX time in milliseconds */ - public void run(long nowMs) { + public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - Set ready = this.accumulator.ready(cluster, nowMs); - - // should we update our metadata? - List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, nowMs); + Set ready = this.accumulator.ready(cluster, now); - // prune the list of ready nodes to eliminate any that we aren't ready to send yet - Set sendable = processReadyNode(ready, nowMs); + // remove any nodes we aren't ready to send to + for (Node node : ready) { + if (!this.client.ready(node, now)) + ready.remove(node); + } // create produce requests - Map> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs); - List requests = generateProduceRequests(batches, nowMs); + Map> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now); + List requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { - log.trace("Partitions with complete batches: {}", ready); - log.trace("Partitions ready to initiate a request: {}", sendable); + log.trace("Nodes with data ready to send: {}", ready); log.trace("Created {} produce requests: {}", requests.size(), requests); } - for (int i = 0; i < requests.size(); i++) { - InFlightRequest request = requests.get(i); - this.inFlightRequests.add(request); - sends.add(request.request); - } - - // do the I/O - try { - this.selector.poll(100L, sends); - } catch (IOException e) { - log.error("Unexpected error during I/O in producer network thread", e); - } - - // handle responses, connections, and disconnections - handleSends(this.selector.completedSends()); - handleResponses(this.selector.completedReceives(), nowMs); - handleDisconnects(this.selector.disconnected(), nowMs); - handleConnects(this.selector.connected()); - } - - /** - * Add a metadata request to the list of sends if we need to make one - */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { - if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) - return; - - Node node = selectMetadataDestination(cluster); - if (node == null) - return; - - if (nodeStates.isConnected(node.id())) { - Set topics = metadata.topics(); - this.metadataFetchInProgress = true; - InFlightRequest metadataRequest = metadataRequest(nowMs, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - sends.add(metadataRequest.request); - this.inFlightRequests.add(metadataRequest); - } else if (nodeStates.canConnect(node.id(), nowMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, nowMs); - } - } - - /** - * Find a good node to make a metadata request to. This method will first look for a node that has an existing - * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding - * requests. - * @return A node with no requests currently being sent or null if no such node exists - */ - private Node selectMetadataDestination(Cluster cluster) { - List nodes = cluster.nodes(); - - // first look for a node to which we are connected and have no outstanding requests - boolean connectionInProgress = false; - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } else if (nodeStates.isConnecting(node.id())) { - connectionInProgress = true; - } - } - - // if we have a connection that is being established now, just wait for that don't make another - if (connectionInProgress) - return null; - - // okay, no luck, pick a random unused node - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } + List responses = this.client.poll(requests, 100L, now); + for (ClientResponse response : responses) { + if (response.wasDisconnected()) + handleDisconnect(response, now); + else + handleResponse(response, now); } - - return null; // we failed to find a good destination - } - - /** - * Get the index in the node list of the node to use for the metadata request - */ - private int metadataNodeIndex(int offset, int size) { - return Utils.abs(offset + this.metadataFetchNodeIndex) % size; } /** @@ -301,161 +179,42 @@ public class Sender implements Runnable { this.wakeup(); } - /** - * Process the set of destination nodes with data ready to send. - * - * 1) If we have an unknown leader node, force refresh the metadata. - * 2) If we have a connection to the appropriate node, add - * it to the returned set; - * 3) If we have not a connection yet, initialize one - */ - private Set processReadyNode(Set ready, long nowMs) { - Set sendable = new HashSet(ready.size()); - for (Node node : ready) { - if (node == null) { - // we don't know about this topic/partition or it has no leader, re-fetch metadata - metadata.forceUpdate(); - } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { - sendable.add(node); - } else if (nodeStates.canConnect(node.id(), nowMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, nowMs); - } - } - return sendable; - } - - /** - * Initiate a connection to the given node - */ - private void initiateConnect(Node node, long nowMs) { - try { - log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); - this.nodeStates.connecting(node.id(), nowMs); - } catch (IOException e) { - /* attempt failed, we'll try again after the backoff */ - nodeStates.disconnected(node.id()); - /* maybe the problem is our metadata, update it */ - metadata.forceUpdate(); - log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); - } - } - - /** - * Handle any closed connections - */ - private void handleDisconnects(List disconnects, long nowMs) { - // clear out the in-flight requests for the disconnected broker - for (int node : disconnects) { - nodeStates.disconnected(node); - log.debug("Node {} disconnected.", node); - for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { - log.trace("Cancelled request {} due to node {} being disconnected", request, node); - ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); - switch (requestKey) { - case PRODUCE: - int correlation = request.request.header().correlationId(); - for (RecordBatch batch : request.batches.values()) - completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, nowMs); - break; - case METADATA: - metadataFetchInProgress = false; - break; - default: - throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id); - } - } - } - // we got a disconnect so we should probably refresh our metadata and see if that broker is dead - if (disconnects.size() > 0) - this.metadata.forceUpdate(); - } - - /** - * Record any connections that completed in our node state - */ - private void handleConnects(List connects) { - for (Integer id : connects) { - log.debug("Completed connection to node {}", id); - this.nodeStates.connected(id); - } - } - - /** - * Process completed sends - */ - public void handleSends(List sends) { - /* if acks = 0 then the request is satisfied once sent */ - for (NetworkSend send : sends) { - Deque requests = this.inFlightRequests.requestQueue(send.destination()); - InFlightRequest request = requests.peekFirst(); - log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request); - if (!request.expectResponse) { - requests.pollFirst(); - if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { - for (RecordBatch batch : request.batches.values()) { - batch.done(-1L, Errors.NONE.exception()); - this.accumulator.deallocate(batch); - } - } - } - } - } - - /** - * Handle responses from the server - */ - private void handleResponses(List receives, long nowMs) { - for (NetworkReceive receive : receives) { - int source = receive.source(); - InFlightRequest req = inFlightRequests.nextCompleted(source); - ResponseHeader header = ResponseHeader.parse(receive.payload()); - short apiKey = req.request.header().apiKey(); - Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); - correlate(req.request.header(), header); - if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) { - log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId()); - handleProduceResponse(req, req.request.header(), body, nowMs); - } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { - log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() - .correlationId()); - handleMetadataResponse(req.request.header(), body, nowMs); - } else { - throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); - } - this.sensors.recordLatency(receive.source(), nowMs - req.createdMs); - } - - } - - private void handleMetadataResponse(RequestHeader header, Struct body, long nowMs) { - this.metadataFetchInProgress = false; - MetadataResponse response = new MetadataResponse(body); - Cluster cluster = response.cluster(); - // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being - // created which means we will get errors and no nodes until it exists - if (cluster.nodes().size() > 0) - this.metadata.update(cluster, nowMs); - else - log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + private void handleDisconnect(ClientResponse response, long now) { + log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination()); + int correlation = response.request().request().header().correlationId(); + @SuppressWarnings("unchecked") + Map responseBatches = (Map) response.request().attachment(); + for (RecordBatch batch : responseBatches.values()) + completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); } /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long nowMs) { - ProduceResponse pr = new ProduceResponse(body); - for (Map responses : pr.responses().values()) { - for (Map.Entry entry : responses.entrySet()) { - TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse response = entry.getValue(); - Errors error = Errors.forCode(response.errorCode); - if (error.exception() instanceof InvalidMetadataException) - metadata.forceUpdate(); - RecordBatch batch = request.batches.get(tp); - completeBatch(batch, error, response.baseOffset, header.correlationId(), nowMs); + private void handleResponse(ClientResponse response, long now) { + int correlationId = response.request().request().header().correlationId(); + log.trace("Received produce response from node {} with correlation id {}", + response.request().request().destination(), + correlationId); + @SuppressWarnings("unchecked") + Map batches = (Map) response.request().attachment(); + // if we have a response, parse it + if (response.hasResponse()) { + ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); + for (Map responses : produceResponse.responses().values()) { + for (Map.Entry entry : responses.entrySet()) { + TopicPartition tp = entry.getKey(); + ProduceResponse.PartitionResponse partResp = entry.getValue(); + Errors error = Errors.forCode(partResp.errorCode); + RecordBatch batch = batches.get(tp); + completeBatch(batch, error, partResp.baseOffset, correlationId, now); + } } + this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + } else { + // this is the acks = 0 case, just complete all requests + for (RecordBatch batch : batches.values()) + completeBatch(batch, Errors.NONE, -1L, correlationId, now); } } @@ -467,7 +226,7 @@ public class Sender implements Runnable { * @param correlationId The correlation id for the request * @param nowMs The current POSIX time stamp in milliseconds */ - private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long nowMs) { + private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) { if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", @@ -475,7 +234,7 @@ public class Sender implements Runnable { batch.topicPartition, this.retries - batch.attempts - 1, error); - this.accumulator.reenqueue(batch, nowMs); + this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { // tell the user the result of their request @@ -484,6 +243,8 @@ public class Sender implements Runnable { if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } + if (error.exception() instanceof InvalidMetadataException) + metadata.forceUpdate(); } /** @@ -494,30 +255,10 @@ public class Sender implements Runnable { } /** - * Validate that the response corresponds to the request we expect or else explode - */ - private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { - if (requestHeader.correlationId() != responseHeader.correlationId()) - throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + - ") does not match request (" + - requestHeader.correlationId() + - ")"); - } - - /** - * Create a metadata request for the given topics - */ - private InFlightRequest metadataRequest(long nowMs, int node, Set topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); - RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct()); - return new InFlightRequest(nowMs, true, send, null); - } - - /** * Transfer the record batches into a list of produce requests on a per-node basis */ - private List generateProduceRequests(Map> collated, long nowMs) { - List requests = new ArrayList(collated.size()); + private List createProduceRequests(Map> collated, long nowMs) { + List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; @@ -526,7 +267,7 @@ public class Sender implements Runnable { /** * Create a produce request from the given record batches */ - private InFlightRequest produceRequest(long nowMs, int destination, short acks, int timeout, List batches) { + private ClientRequest produceRequest(long nowMs, int destination, short acks, int timeout, List batches) { Map batchesByPartition = new HashMap(); Map> batchesByTopic = new HashMap>(); for (RecordBatch batch : batches) { @@ -560,184 +301,15 @@ public class Sender implements Runnable { } produce.set("topic_data", topicDatas.toArray()); - RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce); - return new InFlightRequest(nowMs, acks != 0, send, batchesByPartition); - } - - private RequestHeader header(ApiKeys key) { - return new RequestHeader(key.id, clientId, correlation++); + RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), produce); + return new ClientRequest(nowMs, acks != 0, send, batchesByPartition); } /** * Wake up the selector associated with this send thread */ public void wakeup() { - this.selector.wakeup(); - } - - /** - * The states of a node connection - */ - private static enum ConnectionState { - DISCONNECTED, CONNECTING, CONNECTED - } - - /** - * The state of a node - */ - private static final class NodeState { - private ConnectionState state; - private long lastConnectAttemptMs; - - public NodeState(ConnectionState state, long lastConnectAttempt) { - this.state = state; - this.lastConnectAttemptMs = lastConnectAttempt; - } - - public String toString() { - return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; - } - } - - private static class NodeStates { - private final long reconnectBackoffMs; - private final Map nodeState; - - public NodeStates(long reconnectBackoffMs) { - this.reconnectBackoffMs = reconnectBackoffMs; - this.nodeState = new HashMap(); - } - - public boolean canConnect(int node, long nowMs) { - NodeState state = nodeState.get(node); - if (state == null) - return true; - else - return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs; - } - - public void connecting(int node, long nowMs) { - nodeState.put(node, new NodeState(ConnectionState.CONNECTING, nowMs)); - } - - public boolean isConnected(int node) { - NodeState state = nodeState.get(node); - return state != null && state.state == ConnectionState.CONNECTED; - } - - public boolean isConnecting(int node) { - NodeState state = nodeState.get(node); - return state != null && state.state == ConnectionState.CONNECTING; - } - - public void connected(int node) { - nodeState(node).state = ConnectionState.CONNECTED; - } - - public void disconnected(int node) { - nodeState(node).state = ConnectionState.DISCONNECTED; - } - - private NodeState nodeState(int node) { - NodeState state = this.nodeState.get(node); - if (state == null) - throw new IllegalStateException("No entry found for node " + node); - return state; - } - } - - /** - * An request that hasn't been fully processed yet - */ - private static final class InFlightRequest { - public long createdMs; - public boolean expectResponse; - public Map batches; - public RequestSend request; - - /** - * @param createdMs The unix timestamp in milliseonds for the time at which this request was created. - * @param expectResponse Should we expect a response message or is this request complete once it is sent? - * @param request The request - * @param batches The record batches contained in the request if it is a produce request - */ - public InFlightRequest(long createdMs, boolean expectResponse, RequestSend request, Map batches) { - this.createdMs = createdMs; - this.batches = batches; - this.request = request; - this.expectResponse = expectResponse; - } - - @Override - public String toString() { - return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")"; - } - } - - /** - * A set of outstanding request queues for each node that have not yet received responses - */ - private static final class InFlightRequests { - private final Map> requests = new HashMap>(); - - /** - * Add the given request to the queue for the node it was directed to - */ - public void add(InFlightRequest request) { - Deque reqs = this.requests.get(request.request.destination()); - if (reqs == null) { - reqs = new ArrayDeque(); - this.requests.put(request.request.destination(), reqs); - } - reqs.addFirst(request); - } - - public Deque requestQueue(int node) { - Deque reqs = requests.get(node); - if (reqs == null || reqs.isEmpty()) - throw new IllegalStateException("Response from server for which there are no in-flight requests."); - return reqs; - } - - /** - * Get the oldest request (the one that that will be completed next) for the given node - */ - public InFlightRequest nextCompleted(int node) { - return requestQueue(node).pollLast(); - } - - /** - * Can we send more requests to this node? - * - * @param node Node in question - * @return true iff we have no requests still being sent to the given node - */ - public boolean canSendMore(int node) { - Deque queue = requests.get(node); - return queue == null || queue.isEmpty() || queue.peekFirst().request.complete(); - } - - /** - * Clear out all the in-flight requests for the given node and return them - * - * @param node The node - * @return All the in-flight requests for that node that have been removed - */ - public Iterable clearAll(int node) { - Deque reqs = requests.get(node); - if (reqs == null) { - return Collections.emptyList(); - } else { - return requests.remove(node); - } - } - - public int totalInFlightRequests() { - int total = 0; - for (Deque deque : this.requests.values()) - total += deque.size(); - return total; - } + this.client.wakeup(); } /** @@ -762,11 +334,11 @@ public class Sender implements Runnable { this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor.add("record-queue-time-avg", - "The average time in ms record batches spent in the record accumulator.", - new Avg()); + "The average time in ms record batches spent in the record accumulator.", + new Avg()); this.queueTimeSensor.add("record-queue-time-max", - "The maximum time in ms record batches spent in the record accumulator.", - new Max()); + "The maximum time in ms record batches spent in the record accumulator.", + new Max()); this.requestTimeSensor = metrics.sensor("request-time"); this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg()); @@ -787,7 +359,7 @@ public class Sender implements Runnable { this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { public double measure(MetricConfig config, long nowMs) { - return inFlightRequests.totalInFlightRequests(); + return client.inFlightRequests(); } }); metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() { @@ -820,14 +392,15 @@ public class Sender implements Runnable { } } - public void updateProduceRequestMetrics(List requests) { + public void updateProduceRequestMetrics(List requests) { long nowMs = time.milliseconds(); for (int i = 0; i < requests.size(); i++) { - InFlightRequest request = requests.get(i); + ClientRequest request = requests.get(i); int records = 0; - if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) { + if (request.attachment() != null) { + Map responseBatches = (Map) request.attachment(); + for (RecordBatch batch : responseBatches.values()) { // register all per-topic metrics at once String topic = batch.topicPartition.topic(); @@ -859,7 +432,8 @@ public class Sender implements Runnable { this.retrySensor.record(count, nowMs); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs); + if (topicRetrySensor != null) + topicRetrySensor.record(count, nowMs); } public void recordErrors(String topic, int count) { @@ -867,7 +441,8 @@ public class Sender implements Runnable { this.errorSensor.record(count, nowMs); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs); + if (topicErrorSensor != null) + topicErrorSensor.record(count, nowMs); } public void recordLatency(int node, long latency) { @@ -876,7 +451,8 @@ public class Sender implements Runnable { if (node >= 0) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs); + if (nodeRequestTime != null) + nodeRequestTime.record(latency, nowMs); } } }