diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java new file mode 100644 index 0000000..d2ed3ac --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -0,0 +1,48 @@ +package org.apache.kafka.clients; + +import org.apache.kafka.common.requests.RequestSend; + +/** + * An request that hasn't been fully processed yet + */ +public final class ClientRequest { + private final long createdMs; + private final boolean expectResponse; + private final RequestSend request; + private final Object attachment; + + /** + * @param createdMs The unix timestamp in milliseonds for the time at which this request was created. + * @param expectResponse Should we expect a response message or is this request complete once it is sent? + * @param request The request + * @param attachment Associated data with the request + */ + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, Object attachment) { + this.createdMs = createdMs; + this.attachment = attachment; + this.request = request; + this.expectResponse = expectResponse; + } + + @Override + public String toString() { + return "ClientRequest(expectResponse=" + expectResponse + ", payload=" + attachment + ", request=" + request + ")"; + } + + public boolean expectResponse() { + return expectResponse; + } + + public RequestSend request() { + return request; + } + + public Object attachment() { + return attachment; + } + + public long createdTime() { + return createdMs; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java new file mode 100644 index 0000000..e1b5ae3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java @@ -0,0 +1,56 @@ +package org.apache.kafka.clients; + +import org.apache.kafka.common.protocol.types.Struct; + +public class ClientResponse { + + private final long received; + private final boolean disconnected; + private final ClientRequest request; + private final Struct responseBody; + + public ClientResponse(ClientRequest request, long received, boolean disconnected, Struct responseBody) { + super(); + this.received = received; + this.disconnected = disconnected; + this.request = request; + this.responseBody = responseBody; + } + + public long receivedTime() { + return received; + } + + public boolean wasDisconnected() { + return disconnected; + } + + public ClientRequest request() { + return request; + } + + public Struct responseBody() { + return responseBody; + } + + public boolean hasResponse() { + return responseBody != null; + } + + public long requestLatencyMs() { + return receivedTime() - this.request.createdTime(); + } + + @Override + public String toString() { + return "ClientResponse(received=" + received + + ", disconnected=" + + disconnected + + ", request=" + + request + + ", responseBody=" + + responseBody + + ")"; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java new file mode 100644 index 0000000..dfa1fab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -0,0 +1,88 @@ +package org.apache.kafka.clients; + +import java.util.HashMap; +import java.util.Map; + +/** + * The state of our connection to each node in the cluster. + * + */ +final class ClusterConnectionStates { + private final long reconnectBackoffMs; + private final Map nodeState; + + public ClusterConnectionStates(long reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + this.nodeState = new HashMap(); + } + + /** + * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not + * connected and haven't been connected for at least the minimum reconnection backoff period. + * @param node The node id to check + * @param nowMs The current time in MS + * @return true if we can initiate a new connection + */ + public boolean canConnect(int node, long nowMs) { + NodeConnectionState state = nodeState.get(node); + if (state == null) + return true; + else + return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs; + } + + /** + * Enter the connecting state for the given node. + * @param node The id of the node we are connecting to + * @param nowMs The current time. + */ + public void connecting(int node, long nowMs) { + nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, nowMs)); + } + + /** + * Return true iff we have a connection to the give node + * @param node The id of the node to check + */ + public boolean isConnected(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTED; + } + + /** + * Return true iff we are in the process of connecting to the given node + * @param node The id of the node + */ + public boolean isConnecting(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTING; + } + + /** + * Enter the connected state for the given node + * @param node The node we have connected to + */ + public void connected(int node) { + nodeState(node).state = ConnectionState.CONNECTED; + } + + /** + * Enter the disconnected state for the given node + * @param node The node we have disconnected from + */ + public void disconnected(int node) { + nodeState(node).state = ConnectionState.DISCONNECTED; + } + + /** + * Get the state of our connection to the given state + * @param node The id of the node + * @return The state of our connection + */ + private NodeConnectionState nodeState(int node) { + NodeConnectionState state = this.nodeState.get(node); + if (state == null) + throw new IllegalStateException("No entry found for node " + node); + return state; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java new file mode 100644 index 0000000..a0c327b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java @@ -0,0 +1,8 @@ +package org.apache.kafka.clients; + +/** + * The states of a node connection + */ +enum ConnectionState { + DISCONNECTED, CONNECTING, CONNECTED +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java new file mode 100644 index 0000000..c35e68e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -0,0 +1,98 @@ +package org.apache.kafka.clients; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * The set of requests which have been sent or are being sent but haven't yet received a response + */ +final class InFlightRequests { + + private final int maxInFlightRequestsPerConnection; + private final Map> requests = new HashMap>(); + + public InFlightRequests(int maxInFlightRequestsPerConnection) { + this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; + } + + /** + * Add the given request to the queue for the node it was directed to + */ + public void add(ClientRequest request) { + Deque reqs = this.requests.get(request.request().destination()); + if (reqs == null) { + reqs = new ArrayDeque(); + this.requests.put(request.request().destination(), reqs); + } + reqs.addFirst(request); + } + + /** + * Get the request queue for the given node + */ + private Deque requestQueue(int node) { + Deque reqs = requests.get(node); + if (reqs == null || reqs.isEmpty()) + throw new IllegalStateException("Response from server for which there are no in-flight requests."); + return reqs; + } + + /** + * Get the oldest request (the one that that will be completed next) for the given node + */ + public ClientRequest completeNext(int node) { + return requestQueue(node).pollLast(); + } + + /** + * Get the last request we sent to the given node (but don't remove it from the queue) + * @param node The node id + */ + public ClientRequest lastSent(int node) { + return requestQueue(node).peekFirst(); + } + + public ClientRequest completeLastSent(int node) { + return requestQueue(node).pollFirst(); + } + + /** + * Can we send more requests to this node? + * + * @param node Node in question + * @return true iff we have no requests still being sent to the given node + */ + public boolean canSendMore(int node) { + Deque queue = requests.get(node); + return queue == null || queue.isEmpty() || + (queue.peekFirst().request().complete() && queue.size() < this.maxInFlightRequestsPerConnection); + } + + /** + * Clear out all the in-flight requests for the given node and return them + * + * @param node The node + * @return All the in-flight requests for that node that have been removed + */ + public Iterable clearAll(int node) { + Deque reqs = requests.get(node); + if (reqs == null) { + return Collections.emptyList(); + } else { + return requests.remove(node); + } + } + + /** + * Count all in-flight requests + */ + public int totalInFlightRequests() { + int total = 0; + for (Deque deque : this.requests.values()) + total += deque.size(); + return total; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java new file mode 100644 index 0000000..68c4a29 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -0,0 +1,51 @@ +package org.apache.kafka.clients; + +import java.util.List; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; + +/** + * The interface for {@link KafkaClient} + */ +public interface KafkaClient { + + /** + * Initiate a connection to the given node (if necessary), and return true if already connected. + * @param node The node to connect to. + * @param now The current time + */ + public boolean ready(Node node, long now); + + /** + * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready + * connections. + * @param requests The requests to send + * @param timeout The maximum amount of time to wait for responses in ms + * @param now The current time in ms + */ + public List poll(List requests, long timeout, long now); + + /** + * The number of currently in-flight requests for which we have not yet returned a response + */ + public int inFlightRequests(); + + /** + * Generate a request header for the next request + * @param key The API key of the request + */ + public RequestHeader nextRequestHeader(ApiKeys key); + + /** + * Wake up the client if it is currently blocked waiting for I/O + */ + public void wakeup(); + + /** + * Close the client and disconnect from all nodes + */ + public void close(); + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java new file mode 100644 index 0000000..6f9a4a5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -0,0 +1,375 @@ +package org.apache.kafka.clients; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A network client for asynchronous request/response network i/o. This is an internal class used to implement the + * user-facing producer and consumer clients. + *

+ * This class is not thread-safe! + */ +public class NetworkClient implements KafkaClient { + + private static final Logger log = LoggerFactory.getLogger(NetworkClient.class); + + /* the selector used to perform network i/o */ + private final Selectable selector; + + /* the current cluster metadata */ + private final Metadata metadata; + + /* the state of each nodes connection */ + private final ClusterConnectionStates connectionStates; + + /* the set of requests currently being sent or awaiting a response */ + private final InFlightRequests inFlightRequests; + + /* the socket send buffer size in bytes */ + private final int socketSendBuffer; + + /* the socket receive size buffer in bytes */ + private final int socketReceiveBuffer; + + /* the client id used to identify this client in requests to the server */ + private final String clientId; + + /* the current correlation id to use when sending requests to servers */ + private int correlation; + + /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ + private int metadataFetchNodeIndex; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ + private boolean metadataFetchInProgress; + + public NetworkClient(Selectable selector, + Metadata metadata, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer) { + this.selector = selector; + this.metadata = metadata; + this.clientId = clientId; + this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs); + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; + this.correlation = 0; + this.metadataFetchNodeIndex = 0; + this.metadataFetchInProgress = false; + } + + /** + * Begin connecting to the given node, return true if we are already connected and ready to send to that node. + * @param node The node to check + * @param now The current timestamp + * @return True if we are ready to send to the given node + */ + @Override + public boolean ready(Node node, long now) { + if (isReady(node.id())) { + return true; + } else if (connectionStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); + } + return false; + } + + /** + * Check if the node with the given id is ready to send more requests. + * @param nodeId The node id + * @return true if the node is ready + */ + private boolean isReady(int nodeId) { + return connectionStates.isConnected(nodeId) && inFlightRequests.canSendMore(nodeId); + } + + /** + * Initiate the given requests and check for any new responses, waiting up to the specified time. Requests can only + * be sent for ready nodes. + * @param requests The requests to initiate + * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately + * @param now The current time in milliseconds + * @return The list of responses received + */ + @Override + public List poll(List requests, long timeout, long now) { + Cluster cluster = metadata.fetch(); + + // should we update our metadata? + List sends = new ArrayList(); + maybeUpdateMetadata(cluster, sends, now); + + for (int i = 0; i < requests.size(); i++) { + ClientRequest request = requests.get(i); + int node = request.request().destination(); + if (!isReady(node)) + throw new IllegalStateException("Attempt to send a request to node " + node + " which is not ready."); + + this.inFlightRequests.add(request); + sends.add(request.request()); + } + + // do the I/O + try { + this.selector.poll(timeout, sends); + } catch (IOException e) { + log.error("Unexpected error during I/O in producer network thread", e); + } + + List responses = new ArrayList(); + handleCompletedSends(responses, now); + handleCompletedReceives(responses, now); + handleDisconnections(responses, now); + handleConnections(); + + return responses; + } + + /** + * Get the number of in-flight requests + */ + @Override + public int inFlightRequests() { + return this.inFlightRequests.totalInFlightRequests(); + } + + /** + * Generate a request header for the given API key + * @param key The api key + * @return A request header with the appropriate client id and correlation id + */ + @Override + public RequestHeader nextRequestHeader(ApiKeys key) { + return new RequestHeader(key.id, clientId, correlation++); + } + + /** + * Interrupt the client if it is blocked waiting on I/O. + */ + @Override + public void wakeup() { + this.selector.wakeup(); + } + + /** + * Close the network client + */ + @Override + public void close() { + this.selector.close(); + } + + /** + * Handle any completed request send. In particular if no response is expected consider the request complete. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedSends(List responses, long now) { + // if no response is expected then when the send is completed, return it + for (NetworkSend send : this.selector.completedSends()) { + ClientRequest request = this.inFlightRequests.lastSent(send.destination()); + if (!request.expectResponse()) { + this.inFlightRequests.completeLastSent(send.destination()); + responses.add(new ClientResponse(request, now, false, null)); + } + } + } + + /** + * Handle any completed receives and update the response list with the responses received. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedReceives(List responses, long now) { + for (NetworkReceive receive : this.selector.completedReceives()) { + int source = receive.source(); + ClientRequest req = inFlightRequests.completeNext(source); + ResponseHeader header = ResponseHeader.parse(receive.payload()); + short apiKey = req.request().header().apiKey(); + Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); + correlate(req.request().header(), header); + if (apiKey == ApiKeys.METADATA.id) { + handleMetadataResponse(req.request().header(), body, now); + } else { + // need to add body/header to response here + responses.add(new ClientResponse(req, now, false, body)); + } + } + } + + private void handleMetadataResponse(RequestHeader header, Struct body, long now) { + this.metadataFetchInProgress = false; + MetadataResponse response = new MetadataResponse(body); + Cluster cluster = response.cluster(); + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being + // created which means we will get errors and no nodes until it exists + if (cluster.nodes().size() > 0) + this.metadata.update(cluster, now); + else + log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + } + + /** + * Handle any disconnected connections + * @param responses The list of responses that completed with the disconnection + * @param now The current time + */ + private void handleDisconnections(List responses, long now) { + for (int node : this.selector.disconnected()) { + connectionStates.disconnected(node); + log.debug("Node {} disconnected.", node); + for (ClientRequest request : this.inFlightRequests.clearAll(node)) { + log.trace("Cancelled request {} due to node {} being disconnected", request, node); + ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); + if (requestKey == ApiKeys.METADATA) + metadataFetchInProgress = false; + responses.add(new ClientResponse(request, now, true, null)); + } + } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (this.selector.disconnected().size() > 0) + this.metadata.forceUpdate(); + } + + /** + * Record any newly completed connections + */ + private void handleConnections() { + for (Integer id : this.selector.connected()) { + log.debug("Completed connection to node {}", id); + this.connectionStates.connected(id); + } + } + + /** + * Validate that the response corresponds to the request we expect or else explode + */ + private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { + if (requestHeader.correlationId() != responseHeader.correlationId()) + throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + + ") does not match request (" + + requestHeader.correlationId() + + ")"); + } + + /** + * Create a metadata request for the given topics + */ + private ClientRequest metadataRequest(long nowMs, int node, Set topics) { + MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); + return new ClientRequest(nowMs, true, send, null); + } + + /** + * Add a metadata request to the list of sends if we need to make one + */ + private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { + if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) + return; + + Node node = selectMetadataDestination(cluster); + if (node == null) + return; + + if (connectionStates.isConnected(node.id())) { + Set topics = metadata.topics(); + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(nowMs, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + sends.add(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } else if (connectionStates.canConnect(node.id(), nowMs)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, nowMs); + } + } + + /** + * Find a good node to make a metadata request to. This method will first look for a node that has an existing + * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding + * requests. + * @return A node with no requests currently being sent or null if no such node exists + */ + private Node selectMetadataDestination(Cluster cluster) { + List nodes = cluster.nodes(); + + // first look for a node to which we are connected and have no outstanding requests + boolean connectionInProgress = false; + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (connectionStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); + return node; + } else if (connectionStates.isConnecting(node.id())) { + connectionInProgress = true; + } + } + + // if we have a connection that is being established now, just wait for that don't make another + if (connectionInProgress) + return null; + + // okay, no luck, pick a random unused node + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (this.inFlightRequests.canSendMore(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); + return node; + } + } + + return null; // we failed to find a good destination + } + + /** + * Get the index in the node list of the node to use for the metadata request + */ + private int metadataNodeIndex(int offset, int size) { + return Utils.abs(offset + this.metadataFetchNodeIndex) % size; + } + + /** + * Initiate a connection to the given node + */ + private void initiateConnect(Node node, long nowMs) { + try { + log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + this.connectionStates.connecting(node.id(), nowMs); + } catch (IOException e) { + /* attempt failed, we'll try again after the backoff */ + connectionStates.disconnected(node.id()); + /* maybe the problem is our metadata, update it */ + metadata.forceUpdate(); + log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java new file mode 100644 index 0000000..3eda043 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java @@ -0,0 +1,19 @@ +package org.apache.kafka.clients; + +/** + * The state of our connection to a node + */ +final class NodeConnectionState { + + ConnectionState state; + long lastConnectAttemptMs; + + public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { + this.state = state; + this.lastConnectAttemptMs = lastConnectAttempt; + } + + public String toString() { + return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d15562a..c766c4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -120,18 +121,21 @@ public class KafkaProducer implements Producer { time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - this.sender = new Sender(new Selector(this.metrics, time), + + NetworkClient client = new NetworkClient(new Selector(this.metrics, time), + this.metadata, + clientId, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.sender = new Sender(client, this.metadata, this.accumulator, - clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), config.getInt(ProducerConfig.TIMEOUT_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9b1f565..24c998c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -12,11 +12,15 @@ */ package org.apache.kafka.clients.producer.internals; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.*; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -29,19 +33,11 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; -import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.MetadataRequest; -import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; -import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; -import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -56,16 +52,13 @@ public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); /* the state of each nodes connection */ - private final NodeStates nodeStates; + private final KafkaClient client; /* the record accumulator that batches records */ private final RecordAccumulator accumulator; - /* the selector used to perform network i/o */ - private final Selectable selector; - - /* the client id used to identify this client in requests to the server */ - private final String clientId; + /* the metadata for the client */ + private final Metadata metadata; /* the maximum request size to attempt to send to the server */ private final int maxRequestSize; @@ -79,67 +72,33 @@ public class Sender implements Runnable { /* the number of times to retry a failed request before giving up */ private final int retries; - /* the socket send buffer size in bytes */ - private final int socketSendBuffer; - - /* the socket receive size buffer in bytes */ - private final int socketReceiveBuffer; - - /* the set of currently in-flight requests awaiting a response from the server */ - private final InFlightRequests inFlightRequests; - - /* a reference to the current Cluster instance */ - private final Metadata metadata; - /* the clock instance used for getting the time */ private final Time time; - /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ - private int metadataFetchNodeIndex; - - /* the current correlation id to use when sending requests to servers */ - private int correlation; - - /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ - private boolean metadataFetchInProgress; - /* true while the sender thread is still running */ private volatile boolean running; /* metrics */ private final SenderMetrics sensors; - public Sender(Selectable selector, + public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, - String clientId, int maxRequestSize, - long reconnectBackoffMs, short acks, int retries, int requestTimeout, - int socketSendBuffer, - int socketReceiveBuffer, - int maxInFlightRequestsPerConnection, Metrics metrics, Time time) { - this.nodeStates = new NodeStates(reconnectBackoffMs); + this.client = client; this.accumulator = accumulator; - this.selector = selector; - this.maxRequestSize = maxRequestSize; this.metadata = metadata; - this.clientId = clientId; + this.maxRequestSize = maxRequestSize; this.running = true; this.requestTimeout = requestTimeout; this.acks = acks; this.retries = retries; - this.socketSendBuffer = socketSendBuffer; - this.socketReceiveBuffer = socketReceiveBuffer; - this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); - this.correlation = 0; - this.metadataFetchInProgress = false; this.time = time; - this.metadataFetchNodeIndex = new Random().nextInt(); this.sensors = new SenderMetrics(metrics); } @@ -169,10 +128,7 @@ public class Sender implements Runnable { } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } - } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0); - - // close all the connections - this.selector.close(); + } while (this.accumulator.hasUnsent() || this.client.inFlightRequests() > 0); log.debug("Shutdown of Kafka producer I/O thread has completed."); } @@ -182,115 +138,34 @@ public class Sender implements Runnable { * * @param nowMs The current POSIX time in milliseconds */ - public void run(long nowMs) { + public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - Set ready = this.accumulator.ready(cluster, nowMs); - - // should we update our metadata? - List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, nowMs); + Set ready = this.accumulator.ready(cluster, now); - // prune the list of ready nodes to eliminate any that we aren't ready to send yet - Set sendable = processReadyNode(ready, nowMs); + // remove any nodes we aren't ready to send to + for (Node node : ready) { + if (!this.client.ready(node, now)) + ready.remove(node); + } // create produce requests - Map> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs); - List requests = generateProduceRequests(batches, nowMs); + Map> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now); + List requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { - log.trace("Partitions with complete batches: {}", ready); - log.trace("Partitions ready to initiate a request: {}", sendable); + log.trace("Nodes with data ready to send: {}", ready); log.trace("Created {} produce requests: {}", requests.size(), requests); } - for (int i = 0; i < requests.size(); i++) { - InFlightRequest request = requests.get(i); - this.inFlightRequests.add(request); - sends.add(request.request); - } - - // do the I/O - try { - this.selector.poll(100L, sends); - } catch (IOException e) { - log.error("Unexpected error during I/O in producer network thread", e); - } - - // handle responses, connections, and disconnections - handleSends(this.selector.completedSends()); - handleResponses(this.selector.completedReceives(), nowMs); - handleDisconnects(this.selector.disconnected(), nowMs); - handleConnects(this.selector.connected()); - } - - /** - * Add a metadata request to the list of sends if we need to make one - */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { - if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) - return; - - Node node = selectMetadataDestination(cluster); - if (node == null) - return; - - if (nodeStates.isConnected(node.id())) { - Set topics = metadata.topics(); - this.metadataFetchInProgress = true; - InFlightRequest metadataRequest = metadataRequest(nowMs, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - sends.add(metadataRequest.request); - this.inFlightRequests.add(metadataRequest); - } else if (nodeStates.canConnect(node.id(), nowMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, nowMs); - } - } - - /** - * Find a good node to make a metadata request to. This method will first look for a node that has an existing - * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding - * requests. - * @return A node with no requests currently being sent or null if no such node exists - */ - private Node selectMetadataDestination(Cluster cluster) { - List nodes = cluster.nodes(); - - // first look for a node to which we are connected and have no outstanding requests - boolean connectionInProgress = false; - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } else if (nodeStates.isConnecting(node.id())) { - connectionInProgress = true; - } - } - - // if we have a connection that is being established now, just wait for that don't make another - if (connectionInProgress) - return null; - - // okay, no luck, pick a random unused node - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } + List responses = this.client.poll(requests, 100L, now); + for (ClientResponse response : responses) { + if (response.wasDisconnected()) + handleDisconnect(response, now); + else + handleResponse(response, now); } - - return null; // we failed to find a good destination - } - - /** - * Get the index in the node list of the node to use for the metadata request - */ - private int metadataNodeIndex(int offset, int size) { - return Utils.abs(offset + this.metadataFetchNodeIndex) % size; } /** @@ -302,161 +177,40 @@ public class Sender implements Runnable { this.wakeup(); } - /** - * Process the set of destination nodes with data ready to send. - * - * 1) If we have an unknown leader node, force refresh the metadata. - * 2) If we have a connection to the appropriate node, add - * it to the returned set; - * 3) If we have not a connection yet, initialize one - */ - private Set processReadyNode(Set ready, long nowMs) { - Set sendable = new HashSet(ready.size()); - for (Node node : ready) { - if (node == null) { - // we don't know about this topic/partition or it has no leader, re-fetch metadata - metadata.forceUpdate(); - } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { - sendable.add(node); - } else if (nodeStates.canConnect(node.id(), nowMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, nowMs); - } - } - return sendable; - } - - /** - * Initiate a connection to the given node - */ - private void initiateConnect(Node node, long nowMs) { - try { - log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); - this.nodeStates.connecting(node.id(), nowMs); - } catch (IOException e) { - /* attempt failed, we'll try again after the backoff */ - nodeStates.disconnected(node.id()); - /* maybe the problem is our metadata, update it */ - metadata.forceUpdate(); - log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); - } - } - - /** - * Handle any closed connections - */ - private void handleDisconnects(List disconnects, long nowMs) { - // clear out the in-flight requests for the disconnected broker - for (int node : disconnects) { - nodeStates.disconnected(node); - log.debug("Node {} disconnected.", node); - for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { - log.trace("Cancelled request {} due to node {} being disconnected", request, node); - ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); - switch (requestKey) { - case PRODUCE: - int correlation = request.request.header().correlationId(); - for (RecordBatch batch : request.batches.values()) - completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, nowMs); - break; - case METADATA: - metadataFetchInProgress = false; - break; - default: - throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id); - } - } - } - // we got a disconnect so we should probably refresh our metadata and see if that broker is dead - if (disconnects.size() > 0) - this.metadata.forceUpdate(); - } - - /** - * Record any connections that completed in our node state - */ - private void handleConnects(List connects) { - for (Integer id : connects) { - log.debug("Completed connection to node {}", id); - this.nodeStates.connected(id); - } - } - - /** - * Process completed sends - */ - public void handleSends(List sends) { - /* if acks = 0 then the request is satisfied once sent */ - for (NetworkSend send : sends) { - Deque requests = this.inFlightRequests.requestQueue(send.destination()); - InFlightRequest request = requests.peekFirst(); - log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request); - if (!request.expectResponse) { - requests.pollFirst(); - if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { - for (RecordBatch batch : request.batches.values()) { - batch.done(-1L, Errors.NONE.exception()); - this.accumulator.deallocate(batch); - } - } - } - } - } - - /** - * Handle responses from the server - */ - private void handleResponses(List receives, long nowMs) { - for (NetworkReceive receive : receives) { - int source = receive.source(); - InFlightRequest req = inFlightRequests.nextCompleted(source); - ResponseHeader header = ResponseHeader.parse(receive.payload()); - short apiKey = req.request.header().apiKey(); - Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); - correlate(req.request.header(), header); - if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) { - log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId()); - handleProduceResponse(req, req.request.header(), body, nowMs); - } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { - log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() - .correlationId()); - handleMetadataResponse(req.request.header(), body, nowMs); - } else { - throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); - } - this.sensors.recordLatency(receive.source(), nowMs - req.createdMs); - } - - } - - private void handleMetadataResponse(RequestHeader header, Struct body, long nowMs) { - this.metadataFetchInProgress = false; - MetadataResponse response = new MetadataResponse(body); - Cluster cluster = response.cluster(); - // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being - // created which means we will get errors and no nodes until it exists - if (cluster.nodes().size() > 0) - this.metadata.update(cluster, nowMs); - else - log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + private void handleDisconnect(ClientResponse response, long now) { + log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination()); + int correlation = response.request().request().header().correlationId(); + @SuppressWarnings("unchecked") + Map responseBatches = (Map) response.request().attachment(); + for (RecordBatch batch : responseBatches.values()) + completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); } /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long nowMs) { - ProduceResponse pr = new ProduceResponse(body); - for (Map responses : pr.responses().values()) { - for (Map.Entry entry : responses.entrySet()) { + private void handleResponse(ClientResponse response, long now) { + int correlationId = response.request().request().header().correlationId(); + log.trace("Received produce response from node {} with correlation id {}", + response.request().request().destination(), + correlationId); + @SuppressWarnings("unchecked") + Map batches = (Map) response.request().attachment(); + // if we have a response, parse it + if (response.hasResponse()) { + ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); + for (Map.Entry entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse response = entry.getValue(); - Errors error = Errors.forCode(response.errorCode); - if (error.exception() instanceof InvalidMetadataException) - metadata.forceUpdate(); - RecordBatch batch = request.batches.get(tp); - completeBatch(batch, error, response.baseOffset, header.correlationId(), nowMs); + ProduceResponse.PartitionResponse partResp = entry.getValue(); + Errors error = Errors.forCode(partResp.errorCode); + RecordBatch batch = batches.get(tp); + completeBatch(batch, error, partResp.baseOffset, correlationId, now); } + this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + } else { + // this is the acks = 0 case, just complete all requests + for (RecordBatch batch : batches.values()) + completeBatch(batch, Errors.NONE, -1L, correlationId, now); } } @@ -468,7 +222,7 @@ public class Sender implements Runnable { * @param correlationId The correlation id for the request * @param nowMs The current POSIX time stamp in milliseconds */ - private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long nowMs) { + private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) { if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", @@ -476,7 +230,7 @@ public class Sender implements Runnable { batch.topicPartition, this.retries - batch.attempts - 1, error); - this.accumulator.reenqueue(batch, nowMs); + this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { // tell the user the result of their request @@ -485,6 +239,8 @@ public class Sender implements Runnable { if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } + if (error.exception() instanceof InvalidMetadataException) + metadata.forceUpdate(); } /** @@ -495,30 +251,10 @@ public class Sender implements Runnable { } /** - * Validate that the response corresponds to the request we expect or else explode - */ - private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { - if (requestHeader.correlationId() != responseHeader.correlationId()) - throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + - ") does not match request (" + - requestHeader.correlationId() + - ")"); - } - - /** - * Create a metadata request for the given topics - */ - private InFlightRequest metadataRequest(long nowMs, int node, Set topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); - RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct()); - return new InFlightRequest(nowMs, true, send, null); - } - - /** * Transfer the record batches into a list of produce requests on a per-node basis */ - private List generateProduceRequests(Map> collated, long nowMs) { - List requests = new ArrayList(collated.size()); + private List createProduceRequests(Map> collated, long nowMs) { + List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; @@ -527,225 +263,23 @@ public class Sender implements Runnable { /** * Create a produce request from the given record batches */ - private InFlightRequest produceRequest(long nowMs, int destination, short acks, int timeout, List batches) { - Map batchesByPartition = new HashMap(); - Map> batchesByTopic = new HashMap>(); + private ClientRequest produceRequest(long nowMs, int destination, short acks, int timeout, List batches) { + ProduceRequest request = new ProduceRequest(acks, timeout); + Map recordsByPartition = new HashMap(batches.size()); for (RecordBatch batch : batches) { - batchesByPartition.put(batch.topicPartition, batch); - List found = batchesByTopic.get(batch.topicPartition.topic()); - if (found == null) { - found = new ArrayList(); - batchesByTopic.put(batch.topicPartition.topic(), found); - } - found.add(batch); - } - Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id)); - produce.set("acks", acks); - produce.set("timeout", timeout); - List topicDatas = new ArrayList(batchesByTopic.size()); - for (Map.Entry> entry : batchesByTopic.entrySet()) { - Struct topicData = produce.instance("topic_data"); - topicData.set("topic", entry.getKey()); - List parts = entry.getValue(); - Object[] partitionData = new Object[parts.size()]; - for (int i = 0; i < parts.size(); i++) { - ByteBuffer buffer = parts.get(i).records.buffer(); - buffer.flip(); - Struct part = topicData.instance("data") - .set("partition", parts.get(i).topicPartition.partition()) - .set("record_set", buffer); - partitionData[i] = part; - } - topicData.set("data", partitionData); - topicDatas.add(topicData); + batch.records.buffer().flip(); + request.add(batch.topicPartition, batch.records); + recordsByPartition.put(batch.topicPartition, batch); } - produce.set("topic_data", topicDatas.toArray()); - - RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce); - return new InFlightRequest(nowMs, acks != 0, send, batchesByPartition); - } - - private RequestHeader header(ApiKeys key) { - return new RequestHeader(key.id, clientId, correlation++); + RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); + return new ClientRequest(nowMs, acks != 0, send, recordsByPartition); } /** * Wake up the selector associated with this send thread */ public void wakeup() { - this.selector.wakeup(); - } - - /** - * The states of a node connection - */ - private static enum ConnectionState { - DISCONNECTED, CONNECTING, CONNECTED - } - - /** - * The state of a node - */ - private static final class NodeState { - private ConnectionState state; - private long lastConnectAttemptMs; - - public NodeState(ConnectionState state, long lastConnectAttempt) { - this.state = state; - this.lastConnectAttemptMs = lastConnectAttempt; - } - - public String toString() { - return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; - } - } - - private static class NodeStates { - private final long reconnectBackoffMs; - private final Map nodeState; - - public NodeStates(long reconnectBackoffMs) { - this.reconnectBackoffMs = reconnectBackoffMs; - this.nodeState = new HashMap(); - } - - public boolean canConnect(int node, long nowMs) { - NodeState state = nodeState.get(node); - if (state == null) - return true; - else - return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs; - } - - public void connecting(int node, long nowMs) { - nodeState.put(node, new NodeState(ConnectionState.CONNECTING, nowMs)); - } - - public boolean isConnected(int node) { - NodeState state = nodeState.get(node); - return state != null && state.state == ConnectionState.CONNECTED; - } - - public boolean isConnecting(int node) { - NodeState state = nodeState.get(node); - return state != null && state.state == ConnectionState.CONNECTING; - } - - public void connected(int node) { - nodeState(node).state = ConnectionState.CONNECTED; - } - - public void disconnected(int node) { - nodeState(node).state = ConnectionState.DISCONNECTED; - } - - private NodeState nodeState(int node) { - NodeState state = this.nodeState.get(node); - if (state == null) - throw new IllegalStateException("No entry found for node " + node); - return state; - } - } - - /** - * An request that hasn't been fully processed yet - */ - private static final class InFlightRequest { - public long createdMs; - public boolean expectResponse; - public Map batches; - public RequestSend request; - - /** - * @param createdMs The unix timestamp in milliseonds for the time at which this request was created. - * @param expectResponse Should we expect a response message or is this request complete once it is sent? - * @param request The request - * @param batches The record batches contained in the request if it is a produce request - */ - public InFlightRequest(long createdMs, boolean expectResponse, RequestSend request, Map batches) { - this.createdMs = createdMs; - this.batches = batches; - this.request = request; - this.expectResponse = expectResponse; - } - - @Override - public String toString() { - return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")"; - } - } - - /** - * A set of outstanding request queues for each node that have not yet received responses - */ - private static final class InFlightRequests { - private final int maxInFlightRequestsPerConnection; - private final Map> requests; - - public InFlightRequests(int maxInFlightRequestsPerConnection) { - this.requests = new HashMap>(); - this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; - } - - /** - * Add the given request to the queue for the node it was directed to - */ - public void add(InFlightRequest request) { - Deque reqs = this.requests.get(request.request.destination()); - if (reqs == null) { - reqs = new ArrayDeque(); - this.requests.put(request.request.destination(), reqs); - } - reqs.addFirst(request); - } - - public Deque requestQueue(int node) { - Deque reqs = requests.get(node); - if (reqs == null || reqs.isEmpty()) - throw new IllegalStateException("Response from server for which there are no in-flight requests."); - return reqs; - } - - /** - * Get the oldest request (the one that that will be completed next) for the given node - */ - public InFlightRequest nextCompleted(int node) { - return requestQueue(node).pollLast(); - } - - /** - * Can we send more requests to this node? - * - * @param node Node in question - * @return true iff we have no requests still being sent to the given node - */ - public boolean canSendMore(int node) { - Deque queue = requests.get(node); - return queue == null || queue.isEmpty() || - (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection); - } - - /** - * Clear out all the in-flight requests for the given node and return them - * - * @param node The node - * @return All the in-flight requests for that node that have been removed - */ - public Iterable clearAll(int node) { - Deque reqs = requests.get(node); - if (reqs == null) { - return Collections.emptyList(); - } else { - return requests.remove(node); - } - } - - public int totalInFlightRequests() { - int total = 0; - for (Deque deque : this.requests.values()) - total += deque.size(); - return total; - } + this.client.wakeup(); } /** @@ -795,7 +329,7 @@ public class Sender implements Runnable { this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { public double measure(MetricConfig config, long nowMs) { - return inFlightRequests.totalInFlightRequests(); + return client.inFlightRequests(); } }); metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() { @@ -828,14 +362,15 @@ public class Sender implements Runnable { } } - public void updateProduceRequestMetrics(List requests) { + public void updateProduceRequestMetrics(List requests) { long nowMs = time.milliseconds(); for (int i = 0; i < requests.size(); i++) { - InFlightRequest request = requests.get(i); + ClientRequest request = requests.get(i); int records = 0; - if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) { + if (request.attachment() != null) { + Map responseBatches = (Map) request.attachment(); + for (RecordBatch batch : responseBatches.values()) { // register all per-topic metrics at once String topic = batch.topicPartition.topic(); @@ -868,7 +403,7 @@ public class Sender implements Runnable { String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); if (topicRetrySensor != null) - topicRetrySensor.record(count, nowMs); + topicRetrySensor.record(count, nowMs); } public void recordErrors(String topic, int count) { @@ -876,8 +411,8 @@ public class Sender implements Runnable { this.errorSensor.record(count, nowMs); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor != null) - topicErrorSensor.record(count, nowMs); + if (topicErrorSensor != null) + topicErrorSensor.record(count, nowMs); } public void recordLatency(int node, long latency) { @@ -887,7 +422,7 @@ public class Sender implements Runnable { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); if (nodeRequestTime != null) - nodeRequestTime.record(latency, nowMs); + nodeRequestTime.record(latency, nowMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 3e35898..8463298 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.network; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.CancelledKeyException; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 68b8827..7164701 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.protocol.types; @@ -51,8 +47,9 @@ public class Schema extends Type { Object value = f.type().validate(r.get(f)); f.type.write(buffer, value); } catch (Exception e) { - throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass() - .getName()); + throw new SchemaException("Error writing field '" + f.name + + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } } @@ -66,8 +63,9 @@ public class Schema extends Type { try { objects[i] = fields[i].type.read(buffer); } catch (Exception e) { - throw new SchemaException("Error reading field '" + fields[i].name + "': " + e.getMessage() == null ? e.getMessage() - : e.getClass().getName()); + throw new SchemaException("Error reading field '" + fields[i].name + + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } return new Struct(this, objects); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 428968c..6f6b338 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.record; @@ -88,14 +84,14 @@ public class MemoryRecords implements Records { /** * Check if we have room for a new record containing the given key/value pair - * - * Note that the return value is based on the estimate of the bytes written to the compressor, - * which may not be accurate if compression is really used. When this happens, the following - * append may cause dynamic buffer re-allocation in the underlying byte buffer stream. + * + * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be + * accurate if compression is really used. When this happens, the following append may cause dynamic buffer + * re-allocation in the underlying byte buffer stream. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && - this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); + return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + + Record.recordSize(key, value); } public boolean isFull() { @@ -159,10 +155,10 @@ public class MemoryRecords implements Records { /* * Read the next record from the buffer. - * - * Note that in the compressed message set, each message value size is set as the size - * of the un-compressed version of the message value, so when we do de-compression - * allocating an array of the specified size for reading compressed value data is sufficient. + * + * Note that in the compressed message set, each message value size is set as the size of the un-compressed + * version of the message value, so when we do de-compression allocating an array of the specified size for + * reading compressed value data is sufficient. */ @Override protected LogEntry makeNext() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java new file mode 100644 index 0000000..6036f6a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -0,0 +1,71 @@ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.MemoryRecords; + +public class ProduceRequest { + + private final short acks; + private final int timeout; + private final Map> records; + + public ProduceRequest(short acks, int timeout) { + this.acks = acks; + this.timeout = timeout; + this.records = new HashMap>(); + } + + public void add(TopicPartition tp, MemoryRecords recs) { + List found = this.records.get(tp.topic()); + if (found == null) { + found = new ArrayList(); + records.put(tp.topic(), found); + } + found.add(new PartitionRecords(tp, recs)); + } + + public Struct toStruct() { + Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id)); + produce.set("acks", acks); + produce.set("timeout", timeout); + List topicDatas = new ArrayList(records.size()); + for (Map.Entry> entry : records.entrySet()) { + Struct topicData = produce.instance("topic_data"); + topicData.set("topic", entry.getKey()); + List parts = entry.getValue(); + Object[] partitionData = new Object[parts.size()]; + for (int i = 0; i < parts.size(); i++) { + ByteBuffer buffer = parts.get(i).records.buffer(); + buffer.flip(); + Struct part = topicData.instance("data") + .set("partition", parts.get(i).topicPartition.partition()) + .set("record_set", buffer); + partitionData[i] = part; + } + topicData.set("data", partitionData); + topicDatas.add(topicData); + } + produce.set("topic_data", topicDatas.toArray()); + return produce; + } + + private static final class PartitionRecords { + public final TopicPartition topicPartition; + public final MemoryRecords records; + + public PartitionRecords(TopicPartition topicPartition, MemoryRecords records) { + this.topicPartition = topicPartition; + this.records = records; + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6fa4a58..6cf4fb7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -3,65 +3,50 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.Struct; - import java.util.HashMap; import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Struct; + public class ProduceResponse { - public class PartitionResponse { - public int partitionId; - public short errorCode; - public long baseOffset; - public PartitionResponse(int partitionId, short errorCode, long baseOffset) { - this.partitionId = partitionId; - this.errorCode = errorCode; - this.baseOffset = baseOffset; - } - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append('{'); - b.append("pid: " + partitionId); - b.append(",error: " + errorCode); - b.append(",offset: " + baseOffset); - b.append('}'); - return b.toString(); - } - } + private final Map responses; - private final Map> responses; + public ProduceResponse() { + this.responses = new HashMap(); + } public ProduceResponse(Struct struct) { - responses = new HashMap>(); + responses = new HashMap(); for (Object topicResponse : (Object[]) struct.get("responses")) { Struct topicRespStruct = (Struct) topicResponse; String topic = (String) topicRespStruct.get("topic"); - Map topicResponses = new HashMap(); for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) { Struct partRespStruct = (Struct) partResponse; int partition = (Integer) partRespStruct.get("partition"); short errorCode = (Short) partRespStruct.get("error_code"); long offset = (Long) partRespStruct.get("base_offset"); TopicPartition tp = new TopicPartition(topic, partition); - topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset)); + responses.put(tp, new PartitionResponse(partition, errorCode, offset)); } - responses.put(topic, topicResponses); } } - public Map> responses() { + public void addResponse(TopicPartition tp, int partition, short error, long baseOffset) { + this.responses.put(tp, new PartitionResponse(partition, error, baseOffset)); + } + + public Map responses() { return this.responses; } @@ -70,16 +55,40 @@ public class ProduceResponse { StringBuilder b = new StringBuilder(); b.append('{'); boolean isFirst = true; - for (Map response : responses.values()) { - for (Map.Entry entry : response.entrySet()) { - if (isFirst) - isFirst = false; - else - b.append(','); - b.append(entry.getKey() + " : " + entry.getValue()); - } + for (Map.Entry entry : responses.entrySet()) { + if (isFirst) + isFirst = false; + else + b.append(','); + b.append(entry.getKey() + " : " + entry.getValue()); } b.append('}'); return b.toString(); } + + public static class PartitionResponse { + public int partitionId; + public short errorCode; + public long baseOffset; + + public PartitionResponse(int partitionId, short errorCode, long baseOffset) { + this.partitionId = partitionId; + this.errorCode = errorCode; + this.baseOffset = baseOffset; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("pid: "); + b.append(partitionId); + b.append(",error: "); + b.append(errorCode); + b.append(",offset: "); + b.append(baseOffset); + b.append('}'); + return b.toString(); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java new file mode 100644 index 0000000..3a6d538 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -0,0 +1,86 @@ +package org.apache.kafka.clients; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Time; + +/** + * A mock network client for use testing code + */ +public class MockClient implements KafkaClient { + + private final Time time; + private int correlation = 0; + private final Set ready = new HashSet(); + private final Queue requests = new ArrayDeque(); + private final Queue responses = new ArrayDeque(); + + public MockClient(Time time) { + this.time = time; + } + + @Override + public boolean ready(Node node, long now) { + boolean found = ready.contains(node.id()); + ready.add(node.id()); + return found; + } + + public void disconnect(Integer node) { + Iterator iter = requests.iterator(); + while (iter.hasNext()) { + ClientRequest request = iter.next(); + if (request.request().destination() == node) { + responses.add(new ClientResponse(request, time.milliseconds(), true, null)); + iter.remove(); + } + } + ready.remove(node); + } + + @Override + public List poll(List requests, long timeoutMs, long now) { + this.requests.addAll(requests); + List copy = new ArrayList(this.responses); + this.responses.clear(); + return copy; + } + + public Queue requests() { + return this.requests; + } + + public void respond(Struct body) { + ClientRequest request = requests.remove(); + responses.add(new ClientResponse(request, time.milliseconds(), false, body)); + } + + @Override + public int inFlightRequests() { + return requests.size(); + } + + @Override + public RequestHeader nextRequestHeader(ApiKeys key) { + return new RequestHeader(key.id, "mock", correlation++); + } + + @Override + public void wakeup() { + } + + @Override + public void close() { + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java new file mode 100644 index 0000000..02980e8 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -0,0 +1,99 @@ +package org.apache.kafka.clients; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.ProduceRequest; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +public class NetworkClientTest { + + private MockTime time = new MockTime(); + private MockSelector selector = new MockSelector(time); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private int nodeId = 1; + private Cluster cluster = TestUtils.singletonCluster("test", nodeId); + private Node node = cluster.nodes().get(0); + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024); + + @Before + public void setup() { + metadata.update(cluster, time.milliseconds()); + } + + @Test + public void testReadyAndDisconnect() { + List reqs = new ArrayList(); + assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds())); + assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size()); + client.poll(reqs, 1, time.milliseconds()); + selector.clear(); + assertTrue("Now the client is ready", client.ready(node, time.milliseconds())); + selector.disconnect(node.id()); + client.poll(reqs, 1, time.milliseconds()); + selector.clear(); + assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); + assertTrue("Metadata should get updated.", metadata.needsUpdate(time.milliseconds())); + } + + @Test(expected = IllegalStateException.class) + public void testSendToUnreadyNode() { + RequestSend send = new RequestSend(5, + client.nextRequestHeader(ApiKeys.METADATA), + new MetadataRequest(Arrays.asList("test")).toStruct()); + ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); + client.poll(Arrays.asList(request), 1, time.milliseconds()); + } + + @Test + public void testSimpleRequestResponse() { + ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000); + RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); + RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct()); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); + awaitReady(client, node); + client.poll(Arrays.asList(request), 1, time.milliseconds()); + assertEquals(1, client.inFlightRequests()); + ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId()); + Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); + resp.set("responses", new Object[0]); + int size = respHeader.sizeOf() + resp.sizeOf(); + ByteBuffer buffer = ByteBuffer.allocate(size); + respHeader.writeTo(buffer); + resp.writeTo(buffer); + buffer.flip(); + selector.completeReceive(new NetworkReceive(node.id(), buffer)); + List responses = client.poll(new ArrayList(), 1, time.milliseconds()); + assertEquals(1, responses.size()); + ClientResponse response = responses.get(0); + assertTrue("Should have a response body.", response.hasResponse()); + assertEquals("Should be correlated to the original request", request, response.request()); + } + + private void awaitReady(NetworkClient client, Node node) { + while (!client.ready(node, time.milliseconds())) + client.poll(new ArrayList(), 1, time.milliseconds()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index c4072ae..93b58d0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -17,7 +17,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 3ef692c..90d7b30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -16,62 +16,48 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.requests.RequestSend; -import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; public class SenderTest { - private static final String CLIENT_ID = ""; private static final int MAX_REQUEST_SIZE = 1024 * 1024; - private static final long RECONNECT_BACKOFF_MS = 0L; private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; private static final int REQUEST_TIMEOUT_MS = 10000; - private static final int SEND_BUFFER_SIZE = 64 * 1024; - private static final int RECEIVE_BUFFER_SIZE = 64 * 1024; - private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); - private MockSelector selector = new MockSelector(time); + private MockClient client = new MockClient(time); private int batchSize = 16 * 1024; private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time); - private Sender sender = new Sender(selector, + private Sender sender = new Sender(client, metadata, this.accumulator, - CLIENT_ID, MAX_REQUEST_SIZE, - RECONNECT_BACKOFF_MS, ACKS_ALL, MAX_RETRIES, REQUEST_TIMEOUT_MS, - SEND_BUFFER_SIZE, - RECEIVE_BUFFER_SIZE, - MAX_IN_FLIGHT_REQS, metrics, time); @@ -82,21 +68,14 @@ public class SenderTest { @Test public void testSimple() throws Exception { + int offset = 0; Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequests()); + client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); sender.run(time.milliseconds()); - assertEquals("We should have connected", 1, selector.connected().size()); - selector.clear(); - sender.run(time.milliseconds()); - assertEquals("Single request should be sent", 1, selector.completedSends().size()); - RequestSend request = (RequestSend) selector.completedSends().get(0); - selector.clear(); - long offset = 42; - selector.completeReceive(produceResponse(request.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - offset, - Errors.NONE.code())); + assertEquals("All requests completed.", offset, client.inFlightRequests()); sender.run(time.milliseconds()); assertTrue("Request should be completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -106,69 +85,43 @@ public class SenderTest { public void testRetries() throws Exception { // create a sender with retries = 1 int maxRetries = 1; - Sender sender = new Sender(selector, + Sender sender = new Sender(client, metadata, this.accumulator, - CLIENT_ID, MAX_REQUEST_SIZE, - RECONNECT_BACKOFF_MS, ACKS_ALL, maxRetries, REQUEST_TIMEOUT_MS, - SEND_BUFFER_SIZE, - RECEIVE_BUFFER_SIZE, - MAX_IN_FLIGHT_REQS, new Metrics(), time); + // do a successful retry Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); - RequestSend request1 = completeSend(sender); - selector.clear(); - selector.completeReceive(produceResponse(request1.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - -1, - Errors.REQUEST_TIMED_OUT.code())); - sender.run(time.milliseconds()); - selector.clear(); - sender.run(time.milliseconds()); - RequestSend request2 = completeSend(sender); - selector.completeReceive(produceResponse(request2.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - 42, - Errors.NONE.code())); + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals(1, client.inFlightRequests()); + client.disconnect(client.requests().peek().request().destination()); + assertEquals(0, client.inFlightRequests()); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // resend + assertEquals(1, client.inFlightRequests()); + int offset = 0; + client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); sender.run(time.milliseconds()); - assertTrue("Request should retry and complete", future.isDone()); - assertEquals(42, future.get().offset()); - } - - @Test - public void testMetadataRefreshOnNoLeaderException() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); - RequestSend request = completeSend(); - selector.clear(); - selector.completeReceive(produceResponse(request.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - -1, - Errors.NOT_LEADER_FOR_PARTITION.code())); - sender.run(time.milliseconds()); - completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION); - assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); - } + assertTrue("Request should have retried and completed", future.isDone()); + assertEquals(offset, future.get().offset()); - @Test - public void testMetadataRefreshOnDisconnect() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); - completeSend(); - selector.clear(); - selector.disconnect(cluster.leaderFor(tp).id()); + // do an unsuccessful retry + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + sender.run(time.milliseconds()); // send produce request + for (int i = 0; i < maxRetries + 1; i++) { + client.disconnect(client.requests().peek().request().destination()); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // resend + } sender.run(time.milliseconds()); completedWithError(future, Errors.NETWORK_EXCEPTION); - assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); } private void completedWithError(Future future, Errors error) throws Exception { @@ -181,17 +134,7 @@ public class SenderTest { } } - private RequestSend completeSend() { - return completeSend(sender); - } - - private RequestSend completeSend(Sender sender) { - while (selector.completedSends().size() == 0) - sender.run(time.milliseconds()); - return (RequestSend) selector.completedSends().get(0); - } - - private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { + private Struct produceResponse(String topic, int part, long offset, int error) { Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); Struct response = struct.instance("responses"); response.set("topic", topic); @@ -201,12 +144,7 @@ public class SenderTest { partResp.set("base_offset", offset); response.set("partition_responses", new Object[] { partResp }); struct.set("responses", new Object[] { response }); - ResponseHeader header = new ResponseHeader(correlation); - ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + struct.sizeOf()); - header.writeTo(buffer); - struct.writeTo(buffer); - buffer.rewind(); - return new NetworkReceive(source, buffer); + return struct; } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java index cda8e64..eb7fcf0 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java @@ -1,25 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.utils; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.utils.Time; - +/** + * A clock that you can manually advance by calling sleep + */ public class MockTime implements Time { private long nanos = 0; diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index cd4ca2f..990c305 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -127,7 +127,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException */ @Test - def testNonExistTopic() { + def testNonExistentTopic() { // send a record with non-exist topic val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) intercept[ExecutionException] {