diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java new file mode 100644 index 0000000..d32c319 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.requests.RequestSend; + +/** + * A request being sent to the server. This holds both the network send as well as the client-level metadata. + */ +public final class ClientRequest { + + private final long createdMs; + private final boolean expectResponse; + private final RequestSend request; + private final Object attachment; + + /** + * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. + * @param expectResponse Should we expect a response message or is this request complete once it is sent? + * @param request The request + * @param attachment Associated data with the request + */ + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, Object attachment) { + this.createdMs = createdMs; + this.attachment = attachment; + this.request = request; + this.expectResponse = expectResponse; + } + + @Override + public String toString() { + return "ClientRequest(expectResponse=" + expectResponse + ", payload=" + attachment + ", request=" + request + ")"; + } + + public boolean expectResponse() { + return expectResponse; + } + + public RequestSend request() { + return request; + } + + public Object attachment() { + return attachment; + } + + public long createdTime() { + return createdMs; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java new file mode 100644 index 0000000..14ef69a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.protocol.types.Struct; + +/** + * A response from the server. Contains both the body of the response as well as the correlated request that was + * originally sent. + */ +public class ClientResponse { + + private final long received; + private final boolean disconnected; + private final ClientRequest request; + private final Struct responseBody; + + /** + * @param request The original request + * @param received The unix timestamp when this response was received + * @param disconnected Whether the client disconnected before fully reading a response + * @param responseBody The response contents (or null) if we disconnected or no response was expected + */ + public ClientResponse(ClientRequest request, long received, boolean disconnected, Struct responseBody) { + super(); + this.received = received; + this.disconnected = disconnected; + this.request = request; + this.responseBody = responseBody; + } + + public long receivedTime() { + return received; + } + + public boolean wasDisconnected() { + return disconnected; + } + + public ClientRequest request() { + return request; + } + + public Struct responseBody() { + return responseBody; + } + + public boolean hasResponse() { + return responseBody != null; + } + + public long requestLatencyMs() { + return receivedTime() - this.request.createdTime(); + } + + @Override + public String toString() { + return "ClientResponse(received=" + received + + ", disconnected=" + + disconnected + + ", request=" + + request + + ", responseBody=" + + responseBody + + ")"; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java new file mode 100644 index 0000000..d304660 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.HashMap; +import java.util.Map; + +/** + * The state of our connection to each node in the cluster. + * + */ +final class ClusterConnectionStates { + private final long reconnectBackoffMs; + private final Map nodeState; + + public ClusterConnectionStates(long reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + this.nodeState = new HashMap(); + } + + /** + * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not + * connected and haven't been connected for at least the minimum reconnection backoff period. + * @param node The node id to check + * @param now The current time in MS + * @return true if we can initiate a new connection + */ + public boolean canConnect(int node, long now) { + NodeConnectionState state = nodeState.get(node); + if (state == null) + return true; + else + return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; + } + + /** + * Return true if we are disconnected from the given node and can't re-establish a connection yet + * @param node The node to check + * @param now The current time in ms + */ + public boolean isBlackedOut(int node, long now) { + NodeConnectionState state = nodeState.get(node); + if (state == null) + return false; + else + return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs < this.reconnectBackoffMs; + } + + /** + * Enter the connecting state for the given node. + * @param node The id of the node we are connecting to + * @param now The current time. + */ + public void connecting(int node, long now) { + nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); + } + + /** + * Return true iff we have a connection to the give node + * @param node The id of the node to check + */ + public boolean isConnected(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTED; + } + + /** + * Return true iff we are in the process of connecting to the given node + * @param node The id of the node + */ + public boolean isConnecting(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTING; + } + + /** + * Enter the connected state for the given node + * @param node The node we have connected to + */ + public void connected(int node) { + nodeState(node).state = ConnectionState.CONNECTED; + } + + /** + * Enter the disconnected state for the given node + * @param node The node we have disconnected from + */ + public void disconnected(int node) { + nodeState(node).state = ConnectionState.DISCONNECTED; + } + + /** + * Get the state of our connection to the given state + * @param node The id of the node + * @return The state of our connection + */ + private NodeConnectionState nodeState(int node) { + NodeConnectionState state = this.nodeState.get(node); + if (state == null) + throw new IllegalStateException("No entry found for node " + node); + return state; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java new file mode 100644 index 0000000..ab7e322 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +/** + * The states of a node connection + */ +enum ConnectionState { + DISCONNECTED, CONNECTING, CONNECTED +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java new file mode 100644 index 0000000..936487b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * The set of requests which have been sent or are being sent but haven't yet received a response + */ +final class InFlightRequests { + + private final int maxInFlightRequestsPerConnection; + private final Map> requests = new HashMap>(); + + public InFlightRequests(int maxInFlightRequestsPerConnection) { + this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; + } + + /** + * Add the given request to the queue for the node it was directed to + */ + public void add(ClientRequest request) { + Deque reqs = this.requests.get(request.request().destination()); + if (reqs == null) { + reqs = new ArrayDeque(); + this.requests.put(request.request().destination(), reqs); + } + reqs.addFirst(request); + } + + /** + * Get the request queue for the given node + */ + private Deque requestQueue(int node) { + Deque reqs = requests.get(node); + if (reqs == null || reqs.isEmpty()) + throw new IllegalStateException("Response from server for which there are no in-flight requests."); + return reqs; + } + + /** + * Get the oldest request (the one that that will be completed next) for the given node + */ + public ClientRequest completeNext(int node) { + return requestQueue(node).pollLast(); + } + + /** + * Get the last request we sent to the given node (but don't remove it from the queue) + * @param node The node id + */ + public ClientRequest lastSent(int node) { + return requestQueue(node).peekFirst(); + } + + /** + * Complete the last request that was sent to a particular node. + * @param node The node the request was sent to + * @return The request + */ + public ClientRequest completeLastSent(int node) { + return requestQueue(node).pollFirst(); + } + + /** + * Can we send more requests to this node? + * + * @param node Node in question + * @return true iff we have no requests still being sent to the given node + */ + public boolean canSendMore(int node) { + Deque queue = requests.get(node); + return queue == null || queue.isEmpty() || + (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); + } + + /** + * Return the number of inflight requests directed at the given node + * @param node The node + * @return The request count. + */ + public int inFlightRequestCount(int node) { + Deque queue = requests.get(node); + return queue == null ? 0 : queue.size(); + } + + /** + * Count all in-flight requests for all nodes + */ + public int inFlightRequestCount() { + int total = 0; + for (Deque deque : this.requests.values()) + total += deque.size(); + return total; + } + + /** + * Clear out all the in-flight requests for the given node and return them + * + * @param node The node + * @return All the in-flight requests for that node that have been removed + */ + public Iterable clearAll(int node) { + Deque reqs = requests.get(node); + if (reqs == null) { + return Collections.emptyList(); + } else { + return requests.remove(node); + } + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java new file mode 100644 index 0000000..29658d4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.List; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; + +/** + * The interface for {@link NetworkClient} + */ +public interface KafkaClient { + + /** + * Check if we are currently ready to send another request to the given node but don't attempt to connect if we + * aren't. + * @param node The node to check + * @param now The current timestamp + */ + public boolean isReady(Node node, long now); + + /** + * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a + * node will change only when poll is invoked. + * @param node The node to connect to. + * @param now The current time + * @return true iff we are ready to immediately initiate the sending of another request to the given node. + */ + public boolean ready(Node node, long now); + + /** + * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready + * connections. + * @param requests The requests to send + * @param timeout The maximum amount of time to wait for responses in ms + * @param now The current time in ms + * @throws IllegalStateException If a request is sent to an unready node + */ + public List poll(List requests, long timeout, long now); + + /** + * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection, + * but will potentially choose a node for which we don't yet have a connection if all existing connections are in + * use. + * @param now The current time in ms + * @return The node with the fewest in-flight requests. + */ + public Node leastLoadedNode(long now); + + /** + * The number of currently in-flight requests for which we have not yet returned a response + */ + public int inFlightRequestCount(); + + /** + * Generate a request header for the next request + * @param key The API key of the request + */ + public RequestHeader nextRequestHeader(ApiKeys key); + + /** + * Wake up the client if it is currently blocked waiting for I/O + */ + public void wakeup(); + + /** + * Close the client and disconnect from all nodes + */ + public void close(); + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java new file mode 100644 index 0000000..522881c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A network client for asynchronous request/response network i/o. This is an internal class used to implement the + * user-facing producer and consumer clients. + *

+ * This class is not thread-safe! + */ +public class NetworkClient implements KafkaClient { + + private static final Logger log = LoggerFactory.getLogger(NetworkClient.class); + + /* the selector used to perform network i/o */ + private final Selectable selector; + + /* the current cluster metadata */ + private final Metadata metadata; + + /* the state of each node's connection */ + private final ClusterConnectionStates connectionStates; + + /* the set of requests currently being sent or awaiting a response */ + private final InFlightRequests inFlightRequests; + + /* the socket send buffer size in bytes */ + private final int socketSendBuffer; + + /* the socket receive size buffer in bytes */ + private final int socketReceiveBuffer; + + /* the client id used to identify this client in requests to the server */ + private final String clientId; + + /* a random offset to use when choosing nodes to avoid having all nodes choose the same node */ + private final int nodeIndexOffset; + + /* the current correlation id to use when sending requests to servers */ + private int correlation; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ + private boolean metadataFetchInProgress; + + public NetworkClient(Selectable selector, + Metadata metadata, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer) { + this.selector = selector; + this.metadata = metadata; + this.clientId = clientId; + this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs); + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; + this.correlation = 0; + this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); + this.metadataFetchInProgress = false; + } + + /** + * Begin connecting to the given node, return true if we are already connected and ready to send to that node. + * @param node The node to check + * @param now The current timestamp + * @return True if we are ready to send to the given node + */ + @Override + public boolean ready(Node node, long now) { + if (isReady(node, now)) + return true; + + if (connectionStates.canConnect(node.id(), now)) + // if we are interested in sending to a node and we don't have a connection to it, initiate one + initiateConnect(node, now); + + return false; + } + + /** + * Check if the node with the given id is ready to send more requests. + * @param nodeId The node id + * @param now The current time in ms + * @return true if the node is ready + */ + @Override + public boolean isReady(Node node, long now) { + return isReady(node.id(), now); + } + + private boolean isReady(int node, long now) { + if (this.metadata.needsUpdate(now)) + // if we need to update our metadata declare all requests unready to metadata requests first priority + return false; + else + // otherwise we are ready if we are connected and can send more requests + return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); + } + + /** + * Initiate the given requests and check for any new responses, waiting up to the specified time. Requests can only + * be sent for ready nodes. + * @param requests The requests to initiate + * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately + * @param now The current time in milliseconds + * @return The list of responses received + */ + @Override + public List poll(List requests, long timeout, long now) { + // should we update our metadata? + List sends = new ArrayList(); + maybeUpdateMetadata(sends, now); + + for (int i = 0; i < requests.size(); i++) { + ClientRequest request = requests.get(i); + int nodeId = request.request().destination(); + if (!isReady(nodeId, now)) + throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); + + this.inFlightRequests.add(request); + sends.add(request.request()); + } + + // do the I/O + try { + this.selector.poll(timeout, sends); + } catch (IOException e) { + log.error("Unexpected error during I/O in producer network thread", e); + } + + List responses = new ArrayList(); + handleCompletedSends(responses, now); + handleCompletedReceives(responses, now); + handleDisconnections(responses, now); + handleConnections(); + + return responses; + } + + /** + * Get the number of in-flight requests + */ + @Override + public int inFlightRequestCount() { + return this.inFlightRequests.inFlightRequestCount(); + } + + /** + * Generate a request header for the given API key + * @param key The api key + * @return A request header with the appropriate client id and correlation id + */ + @Override + public RequestHeader nextRequestHeader(ApiKeys key) { + return new RequestHeader(key.id, clientId, correlation++); + } + + /** + * Interrupt the client if it is blocked waiting on I/O. + */ + @Override + public void wakeup() { + this.selector.wakeup(); + } + + /** + * Close the network client + */ + @Override + public void close() { + this.selector.close(); + } + + /** + * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will + * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a + * connection if all existing connections are in use. This method will never choose a node for which there is no + * existing connection and from which we have disconnected within the reconnect backoff period. + * @return The node with the fewest in-flight requests. + */ + public Node leastLoadedNode(long now) { + List nodes = this.metadata.fetch().nodes(); + int inflight = Integer.MAX_VALUE; + Node found = null; + for (int i = 0; i < nodes.size(); i++) { + int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size()); + Node node = nodes.get(idx); + int currInflight = this.inFlightRequests.inFlightRequestCount(node.id()); + if (currInflight == 0 && this.connectionStates.isConnected(node.id())) { + // if we find an established connection with no in-flight requests we can stop right away + return node; + } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) { + // otherwise if this is the best we have found so far, record that + inflight = currInflight; + found = node; + } + } + + return found; + } + + /** + * Handle any completed request send. In particular if no response is expected consider the request complete. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedSends(List responses, long now) { + // if no response is expected then when the send is completed, return it + for (NetworkSend send : this.selector.completedSends()) { + ClientRequest request = this.inFlightRequests.lastSent(send.destination()); + if (!request.expectResponse()) { + this.inFlightRequests.completeLastSent(send.destination()); + responses.add(new ClientResponse(request, now, false, null)); + } + } + } + + /** + * Handle any completed receives and update the response list with the responses received. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedReceives(List responses, long now) { + for (NetworkReceive receive : this.selector.completedReceives()) { + int source = receive.source(); + ClientRequest req = inFlightRequests.completeNext(source); + ResponseHeader header = ResponseHeader.parse(receive.payload()); + short apiKey = req.request().header().apiKey(); + Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); + correlate(req.request().header(), header); + if (apiKey == ApiKeys.METADATA.id) { + handleMetadataResponse(req.request().header(), body, now); + } else { + // need to add body/header to response here + responses.add(new ClientResponse(req, now, false, body)); + } + } + } + + private void handleMetadataResponse(RequestHeader header, Struct body, long now) { + this.metadataFetchInProgress = false; + MetadataResponse response = new MetadataResponse(body); + Cluster cluster = response.cluster(); + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being + // created which means we will get errors and no nodes until it exists + if (cluster.nodes().size() > 0) + this.metadata.update(cluster, now); + else + log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + } + + /** + * Handle any disconnected connections + * @param responses The list of responses that completed with the disconnection + * @param now The current time + */ + private void handleDisconnections(List responses, long now) { + for (int node : this.selector.disconnected()) { + connectionStates.disconnected(node); + log.debug("Node {} disconnected.", node); + for (ClientRequest request : this.inFlightRequests.clearAll(node)) { + log.trace("Cancelled request {} due to node {} being disconnected", request, node); + ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); + if (requestKey == ApiKeys.METADATA) + metadataFetchInProgress = false; + else + responses.add(new ClientResponse(request, now, true, null)); + } + } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (this.selector.disconnected().size() > 0) + this.metadata.forceUpdate(); + } + + /** + * Record any newly completed connections + */ + private void handleConnections() { + for (Integer id : this.selector.connected()) { + log.debug("Completed connection to node {}", id); + this.connectionStates.connected(id); + } + } + + /** + * Validate that the response corresponds to the request we expect or else explode + */ + private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { + if (requestHeader.correlationId() != responseHeader.correlationId()) + throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + + ") does not match request (" + + requestHeader.correlationId() + + ")"); + } + + /** + * Create a metadata request for the given topics + */ + private ClientRequest metadataRequest(long now, int node, Set topics) { + MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); + return new ClientRequest(now, true, send, null); + } + + /** + * Add a metadata request to the list of sends if we need to make one + */ + private void maybeUpdateMetadata(List sends, long now) { + if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) + return; + + Node node = this.leastLoadedNode(now); + if (node == null) + return; + + if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + Set topics = metadata.topics(); + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + sends.add(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } else if (connectionStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); + } + } + + /** + * Initiate a connection to the given node + */ + private void initiateConnect(Node node, long now) { + try { + log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + this.connectionStates.connecting(node.id(), now); + } catch (IOException e) { + /* attempt failed, we'll try again after the backoff */ + connectionStates.disconnected(node.id()); + /* maybe the problem is our metadata, update it */ + metadata.forceUpdate(); + log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java new file mode 100644 index 0000000..752a979 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +/** + * The state of our connection to a node + */ +final class NodeConnectionState { + + ConnectionState state; + long lastConnectAttemptMs; + + public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { + this.state = state; + this.lastConnectAttemptMs = lastConnectAttempt; + } + + public String toString() { + return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d15562a..00775ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -119,19 +120,22 @@ public class KafkaProducer implements Producer { metrics, time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - this.sender = new Sender(new Selector(this.metrics, time), + this.metadata.update(Cluster.bootstrap(addresses), 0); + + NetworkClient client = new NetworkClient(new Selector(this.metrics, time), + this.metadata, + clientId, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.sender = new Sender(client, this.metadata, this.accumulator, - clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), config.getInt(ProducerConfig.TIMEOUT_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index f47a461..57bc285 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -105,8 +105,8 @@ public final class Metadata { * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more * than metadataExpireMs has passed since the last refresh) */ - public synchronized boolean needsUpdate(long nowMs) { - long msSinceLastUpdate = nowMs - this.lastRefreshMs; + public synchronized boolean needsUpdate(long now) { + long msSinceLastUpdate = now - this.lastRefreshMs; boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs; boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs; return updateAllowed && updateNeeded; @@ -129,9 +129,9 @@ public final class Metadata { /** * Update the cluster metadata */ - public synchronized void update(Cluster cluster, long nowMs) { + public synchronized void update(Cluster cluster, long now) { this.forceUpdate = false; - this.lastRefreshMs = nowMs; + this.lastRefreshMs = now; this.cluster = cluster; notifyAll(); log.debug("Updated cluster metadata to {}", cluster); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 4010d42..1ed3c28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,7 +13,15 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; @@ -91,21 +99,21 @@ public final class RecordAccumulator { metrics.addMetric("waiting-threads", "The number of user threads blocked waiting for buffer memory to enqueue their records", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return free.queued(); } }); metrics.addMetric("buffer-total-bytes", "The maximum amount of buffer memory the client can use (whether or not it is currently used).", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return free.totalMemory(); } }); metrics.addMetric("buffer-available-bytes", "The total amount of buffer memory that is not being used (either unallocated or in the free list).", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return free.availableMemory(); } }); @@ -163,9 +171,9 @@ public final class RecordAccumulator { /** * Re-enqueue the given record batch in the accumulator to retry */ - public void reenqueue(RecordBatch batch, long nowMs) { + public void reenqueue(RecordBatch batch, long now) { batch.attempts++; - batch.lastAttemptMs = nowMs; + batch.lastAttemptMs = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); @@ -175,8 +183,8 @@ public final class RecordAccumulator { /** * Get a list of nodes whose partitions are ready to be sent. *

- * A destination node is ready to send data if ANY one of its partition is not backing off the send - * and ANY of the following are true : + * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the + * following are true : *

    *
  1. The record set is full *
  2. The record set has sat in the accumulator for at least lingerMs milliseconds @@ -185,7 +193,7 @@ public final class RecordAccumulator { *
  3. The accumulator has been closed *
*/ - public Set ready(Cluster cluster, long nowMs) { + public Set ready(Cluster cluster, long now) { Set readyNodes = new HashSet(); boolean exhausted = this.free.queued() > 0; @@ -198,9 +206,9 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > now; boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = nowMs - batch.createdMs >= lingerMs; + boolean expired = now - batch.createdMs >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) readyNodes.add(leader); @@ -227,18 +235,17 @@ public final class RecordAccumulator { } /** - * Drain all the data for the given nodes and collate them into a list of - * batches that will fit within the specified size on a per-node basis. - * This method attempts to avoid choosing the same topic-node over and over. + * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified + * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. * * @param cluster The current cluster metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain - * @param nowMs The current unix time in milliseconds + * @param now The current unix time in milliseconds * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public Map> drain(Cluster cluster, Set nodes, int maxSize, long nowMs) { + public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); @@ -266,7 +273,7 @@ public final class RecordAccumulator { batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); - batch.drainedMs = nowMs; + batch.drainedMs = now; } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 5ee5455..dd0af8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -42,9 +42,9 @@ public final class RecordBatch { private final ProduceRequestResult produceFuture; private final List thunks; - public RecordBatch(TopicPartition tp, MemoryRecords records, long nowMs) { - this.createdMs = nowMs; - this.lastAttemptMs = nowMs; + public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { + this.createdMs = now; + this.lastAttemptMs = now; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 4352466..c67b947 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -3,20 +3,24 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -29,19 +33,11 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; -import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.MetadataRequest; -import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; -import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; -import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -56,16 +52,13 @@ public class Sender implements Runnable { private static final Logger log = LoggerFactory.getLogger(Sender.class); /* the state of each nodes connection */ - private final NodeStates nodeStates; + private final KafkaClient client; /* the record accumulator that batches records */ private final RecordAccumulator accumulator; - /* the selector used to perform network i/o */ - private final Selectable selector; - - /* the client id used to identify this client in requests to the server */ - private final String clientId; + /* the metadata for the client */ + private final Metadata metadata; /* the maximum request size to attempt to send to the server */ private final int maxRequestSize; @@ -79,67 +72,33 @@ public class Sender implements Runnable { /* the number of times to retry a failed request before giving up */ private final int retries; - /* the socket send buffer size in bytes */ - private final int socketSendBuffer; - - /* the socket receive size buffer in bytes */ - private final int socketReceiveBuffer; - - /* the set of currently in-flight requests awaiting a response from the server */ - private final InFlightRequests inFlightRequests; - - /* a reference to the current Cluster instance */ - private final Metadata metadata; - /* the clock instance used for getting the time */ private final Time time; - /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ - private int metadataFetchNodeIndex; - - /* the current correlation id to use when sending requests to servers */ - private int correlation; - - /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ - private boolean metadataFetchInProgress; - /* true while the sender thread is still running */ private volatile boolean running; /* metrics */ private final SenderMetrics sensors; - public Sender(Selectable selector, + public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, - String clientId, int maxRequestSize, - long reconnectBackoffMs, short acks, int retries, int requestTimeout, - int socketSendBuffer, - int socketReceiveBuffer, - int maxInFlightRequestsPerConnection, Metrics metrics, Time time) { - this.nodeStates = new NodeStates(reconnectBackoffMs); + this.client = client; this.accumulator = accumulator; - this.selector = selector; - this.maxRequestSize = maxRequestSize; this.metadata = metadata; - this.clientId = clientId; + this.maxRequestSize = maxRequestSize; this.running = true; this.requestTimeout = requestTimeout; this.acks = acks; this.retries = retries; - this.socketSendBuffer = socketSendBuffer; - this.socketReceiveBuffer = socketReceiveBuffer; - this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); - this.correlation = 0; - this.metadataFetchInProgress = false; this.time = time; - this.metadataFetchNodeIndex = new Random().nextInt(); this.sensors = new SenderMetrics(metrics); } @@ -169,128 +128,46 @@ public class Sender implements Runnable { } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } - } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0); + } while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0); - // close all the connections - this.selector.close(); + this.client.close(); log.debug("Shutdown of Kafka producer I/O thread has completed."); } /** * Run a single iteration of sending - * - * @param nowMs The current POSIX time in milliseconds + * + * @param now The current POSIX time in milliseconds */ - public void run(long nowMs) { + public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - Set ready = this.accumulator.ready(cluster, nowMs); - - // should we update our metadata? - List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, nowMs); + Set ready = this.accumulator.ready(cluster, now); - // prune the list of ready nodes to eliminate any that we aren't ready to send yet - Set sendable = processReadyNode(ready, nowMs); + // remove any nodes we aren't ready to send to + for (Node node : ready) { + if (!this.client.ready(node, now)) + ready.remove(node); + } // create produce requests - Map> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs); - List requests = generateProduceRequests(batches, nowMs); + Map> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now); + List requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { - log.trace("Partitions with complete batches: {}", ready); - log.trace("Partitions ready to initiate a request: {}", sendable); + log.trace("Nodes with data ready to send: {}", ready); log.trace("Created {} produce requests: {}", requests.size(), requests); } - for (int i = 0; i < requests.size(); i++) { - InFlightRequest request = requests.get(i); - this.inFlightRequests.add(request); - sends.add(request.request); - } - - // do the I/O - try { - this.selector.poll(100L, sends); - } catch (IOException e) { - log.error("Unexpected error during I/O in producer network thread", e); - } - - // handle responses, connections, and disconnections - handleSends(this.selector.completedSends()); - handleResponses(this.selector.completedReceives(), nowMs); - handleDisconnects(this.selector.disconnected(), nowMs); - handleConnects(this.selector.connected()); - } - - /** - * Add a metadata request to the list of sends if we need to make one - */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { - if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) - return; - - Node node = selectMetadataDestination(cluster); - if (node == null) - return; - - if (nodeStates.isConnected(node.id())) { - Set topics = metadata.topics(); - this.metadataFetchInProgress = true; - InFlightRequest metadataRequest = metadataRequest(nowMs, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - sends.add(metadataRequest.request); - this.inFlightRequests.add(metadataRequest); - } else if (nodeStates.canConnect(node.id(), nowMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, nowMs); - } - } - - /** - * Find a good node to make a metadata request to. This method will first look for a node that has an existing - * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding - * requests. - * @return A node with no requests currently being sent or null if no such node exists - */ - private Node selectMetadataDestination(Cluster cluster) { - List nodes = cluster.nodes(); - - // first look for a node to which we are connected and have no outstanding requests - boolean connectionInProgress = false; - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } else if (nodeStates.isConnecting(node.id())) { - connectionInProgress = true; - } - } - - // if we have a connection that is being established now, just wait for that don't make another - if (connectionInProgress) - return null; - - // okay, no luck, pick a random unused node - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } + List responses = this.client.poll(requests, 100L, now); + for (ClientResponse response : responses) { + if (response.wasDisconnected()) + handleDisconnect(response, now); + else + handleResponse(response, now); } - - return null; // we failed to find a good destination - } - - /** - * Get the index in the node list of the node to use for the metadata request - */ - private int metadataNodeIndex(int offset, int size) { - return Utils.abs(offset + this.metadataFetchNodeIndex) % size; } /** @@ -302,161 +179,40 @@ public class Sender implements Runnable { this.wakeup(); } - /** - * Process the set of destination nodes with data ready to send. - * - * 1) If we have an unknown leader node, force refresh the metadata. - * 2) If we have a connection to the appropriate node, add - * it to the returned set; - * 3) If we have not a connection yet, initialize one - */ - private Set processReadyNode(Set ready, long nowMs) { - Set sendable = new HashSet(ready.size()); - for (Node node : ready) { - if (node == null) { - // we don't know about this topic/partition or it has no leader, re-fetch metadata - metadata.forceUpdate(); - } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { - sendable.add(node); - } else if (nodeStates.canConnect(node.id(), nowMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, nowMs); - } - } - return sendable; - } - - /** - * Initiate a connection to the given node - */ - private void initiateConnect(Node node, long nowMs) { - try { - log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); - this.nodeStates.connecting(node.id(), nowMs); - } catch (IOException e) { - /* attempt failed, we'll try again after the backoff */ - nodeStates.disconnected(node.id()); - /* maybe the problem is our metadata, update it */ - metadata.forceUpdate(); - log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); - } - } - - /** - * Handle any closed connections - */ - private void handleDisconnects(List disconnects, long nowMs) { - // clear out the in-flight requests for the disconnected broker - for (int node : disconnects) { - nodeStates.disconnected(node); - log.debug("Node {} disconnected.", node); - for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { - log.trace("Cancelled request {} due to node {} being disconnected", request, node); - ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); - switch (requestKey) { - case PRODUCE: - int correlation = request.request.header().correlationId(); - for (RecordBatch batch : request.batches.values()) - completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, nowMs); - break; - case METADATA: - metadataFetchInProgress = false; - break; - default: - throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id); - } - } - } - // we got a disconnect so we should probably refresh our metadata and see if that broker is dead - if (disconnects.size() > 0) - this.metadata.forceUpdate(); - } - - /** - * Record any connections that completed in our node state - */ - private void handleConnects(List connects) { - for (Integer id : connects) { - log.debug("Completed connection to node {}", id); - this.nodeStates.connected(id); - } - } - - /** - * Process completed sends - */ - public void handleSends(List sends) { - /* if acks = 0 then the request is satisfied once sent */ - for (NetworkSend send : sends) { - Deque requests = this.inFlightRequests.requestQueue(send.destination()); - InFlightRequest request = requests.peekFirst(); - log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request); - if (!request.expectResponse) { - requests.pollFirst(); - if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { - for (RecordBatch batch : request.batches.values()) { - batch.done(-1L, Errors.NONE.exception()); - this.accumulator.deallocate(batch); - } - } - } - } - } - - /** - * Handle responses from the server - */ - private void handleResponses(List receives, long nowMs) { - for (NetworkReceive receive : receives) { - int source = receive.source(); - InFlightRequest req = inFlightRequests.nextCompleted(source); - ResponseHeader header = ResponseHeader.parse(receive.payload()); - short apiKey = req.request.header().apiKey(); - Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); - correlate(req.request.header(), header); - if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) { - log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId()); - handleProduceResponse(req, req.request.header(), body, nowMs); - } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { - log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() - .correlationId()); - handleMetadataResponse(req.request.header(), body, nowMs); - } else { - throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); - } - this.sensors.recordLatency(receive.source(), nowMs - req.createdMs); - } - - } - - private void handleMetadataResponse(RequestHeader header, Struct body, long nowMs) { - this.metadataFetchInProgress = false; - MetadataResponse response = new MetadataResponse(body); - Cluster cluster = response.cluster(); - // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being - // created which means we will get errors and no nodes until it exists - if (cluster.nodes().size() > 0) - this.metadata.update(cluster, nowMs); - else - log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + private void handleDisconnect(ClientResponse response, long now) { + log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination()); + int correlation = response.request().request().header().correlationId(); + @SuppressWarnings("unchecked") + Map responseBatches = (Map) response.request().attachment(); + for (RecordBatch batch : responseBatches.values()) + completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); } /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long nowMs) { - ProduceResponse pr = new ProduceResponse(body); - for (Map responses : pr.responses().values()) { - for (Map.Entry entry : responses.entrySet()) { + private void handleResponse(ClientResponse response, long now) { + int correlationId = response.request().request().header().correlationId(); + log.trace("Received produce response from node {} with correlation id {}", + response.request().request().destination(), + correlationId); + @SuppressWarnings("unchecked") + Map batches = (Map) response.request().attachment(); + // if we have a response, parse it + if (response.hasResponse()) { + ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); + for (Map.Entry entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse response = entry.getValue(); - Errors error = Errors.forCode(response.errorCode); - if (error.exception() instanceof InvalidMetadataException) - metadata.forceUpdate(); - RecordBatch batch = request.batches.get(tp); - completeBatch(batch, error, response.baseOffset, header.correlationId(), nowMs); + ProduceResponse.PartitionResponse partResp = entry.getValue(); + Errors error = Errors.forCode(partResp.errorCode); + RecordBatch batch = batches.get(tp); + completeBatch(batch, error, partResp.baseOffset, correlationId, now); } + this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + } else { + // this is the acks = 0 case, just complete all requests + for (RecordBatch batch : batches.values()) + completeBatch(batch, Errors.NONE, -1L, correlationId, now); } } @@ -466,9 +222,9 @@ public class Sender implements Runnable { * @param error The error (or null if none) * @param baseOffset The base offset assigned to the records if successful * @param correlationId The correlation id for the request - * @param nowMs The current POSIX time stamp in milliseconds + * @param now The current POSIX time stamp in milliseconds */ - private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long nowMs) { + private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) { if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", @@ -476,7 +232,7 @@ public class Sender implements Runnable { batch.topicPartition, this.retries - batch.attempts - 1, error); - this.accumulator.reenqueue(batch, nowMs); + this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { // tell the user the result of their request @@ -485,6 +241,8 @@ public class Sender implements Runnable { if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } + if (error.exception() instanceof InvalidMetadataException) + metadata.forceUpdate(); } /** @@ -495,257 +253,35 @@ public class Sender implements Runnable { } /** - * Validate that the response corresponds to the request we expect or else explode - */ - private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { - if (requestHeader.correlationId() != responseHeader.correlationId()) - throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + - ") does not match request (" + - requestHeader.correlationId() + - ")"); - } - - /** - * Create a metadata request for the given topics - */ - private InFlightRequest metadataRequest(long nowMs, int node, Set topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); - RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct()); - return new InFlightRequest(nowMs, true, send, null); - } - - /** * Transfer the record batches into a list of produce requests on a per-node basis */ - private List generateProduceRequests(Map> collated, long nowMs) { - List requests = new ArrayList(collated.size()); + private List createProduceRequests(Map> collated, long now) { + List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) - requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); + requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; } /** * Create a produce request from the given record batches */ - private InFlightRequest produceRequest(long nowMs, int destination, short acks, int timeout, List batches) { - Map batchesByPartition = new HashMap(); - Map> batchesByTopic = new HashMap>(); + private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List batches) { + ProduceRequest request = new ProduceRequest(acks, timeout); + Map recordsByPartition = new HashMap(batches.size()); for (RecordBatch batch : batches) { - batchesByPartition.put(batch.topicPartition, batch); - List found = batchesByTopic.get(batch.topicPartition.topic()); - if (found == null) { - found = new ArrayList(); - batchesByTopic.put(batch.topicPartition.topic(), found); - } - found.add(batch); - } - Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id)); - produce.set("acks", acks); - produce.set("timeout", timeout); - List topicDatas = new ArrayList(batchesByTopic.size()); - for (Map.Entry> entry : batchesByTopic.entrySet()) { - Struct topicData = produce.instance("topic_data"); - topicData.set("topic", entry.getKey()); - List parts = entry.getValue(); - Object[] partitionData = new Object[parts.size()]; - for (int i = 0; i < parts.size(); i++) { - ByteBuffer buffer = parts.get(i).records.buffer(); - buffer.flip(); - Struct part = topicData.instance("data") - .set("partition", parts.get(i).topicPartition.partition()) - .set("record_set", buffer); - partitionData[i] = part; - } - topicData.set("data", partitionData); - topicDatas.add(topicData); + batch.records.buffer().flip(); + request.add(batch.topicPartition, batch.records); + recordsByPartition.put(batch.topicPartition, batch); } - produce.set("topic_data", topicDatas.toArray()); - - RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce); - return new InFlightRequest(nowMs, acks != 0, send, batchesByPartition); - } - - private RequestHeader header(ApiKeys key) { - return new RequestHeader(key.id, clientId, correlation++); + RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); + return new ClientRequest(now, acks != 0, send, recordsByPartition); } /** * Wake up the selector associated with this send thread */ public void wakeup() { - this.selector.wakeup(); - } - - /** - * The states of a node connection - */ - private static enum ConnectionState { - DISCONNECTED, CONNECTING, CONNECTED - } - - /** - * The state of a node - */ - private static final class NodeState { - private ConnectionState state; - private long lastConnectAttemptMs; - - public NodeState(ConnectionState state, long lastConnectAttempt) { - this.state = state; - this.lastConnectAttemptMs = lastConnectAttempt; - } - - public String toString() { - return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; - } - } - - private static class NodeStates { - private final long reconnectBackoffMs; - private final Map nodeState; - - public NodeStates(long reconnectBackoffMs) { - this.reconnectBackoffMs = reconnectBackoffMs; - this.nodeState = new HashMap(); - } - - public boolean canConnect(int node, long nowMs) { - NodeState state = nodeState.get(node); - if (state == null) - return true; - else - return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs; - } - - public void connecting(int node, long nowMs) { - nodeState.put(node, new NodeState(ConnectionState.CONNECTING, nowMs)); - } - - public boolean isConnected(int node) { - NodeState state = nodeState.get(node); - return state != null && state.state == ConnectionState.CONNECTED; - } - - public boolean isConnecting(int node) { - NodeState state = nodeState.get(node); - return state != null && state.state == ConnectionState.CONNECTING; - } - - public void connected(int node) { - nodeState(node).state = ConnectionState.CONNECTED; - } - - public void disconnected(int node) { - nodeState(node).state = ConnectionState.DISCONNECTED; - } - - private NodeState nodeState(int node) { - NodeState state = this.nodeState.get(node); - if (state == null) - throw new IllegalStateException("No entry found for node " + node); - return state; - } - } - - /** - * An request that hasn't been fully processed yet - */ - private static final class InFlightRequest { - public long createdMs; - public boolean expectResponse; - public Map batches; - public RequestSend request; - - /** - * @param createdMs The unix timestamp in milliseonds for the time at which this request was created. - * @param expectResponse Should we expect a response message or is this request complete once it is sent? - * @param request The request - * @param batches The record batches contained in the request if it is a produce request - */ - public InFlightRequest(long createdMs, boolean expectResponse, RequestSend request, Map batches) { - this.createdMs = createdMs; - this.batches = batches; - this.request = request; - this.expectResponse = expectResponse; - } - - @Override - public String toString() { - return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")"; - } - } - - /** - * A set of outstanding request queues for each node that have not yet received responses - */ - private static final class InFlightRequests { - private final int maxInFlightRequestsPerConnection; - private final Map> requests; - - public InFlightRequests(int maxInFlightRequestsPerConnection) { - this.requests = new HashMap>(); - this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; - } - - /** - * Add the given request to the queue for the node it was directed to - */ - public void add(InFlightRequest request) { - Deque reqs = this.requests.get(request.request.destination()); - if (reqs == null) { - reqs = new ArrayDeque(); - this.requests.put(request.request.destination(), reqs); - } - reqs.addFirst(request); - } - - public Deque requestQueue(int node) { - Deque reqs = requests.get(node); - if (reqs == null || reqs.isEmpty()) - throw new IllegalStateException("Response from server for which there are no in-flight requests."); - return reqs; - } - - /** - * Get the oldest request (the one that that will be completed next) for the given node - */ - public InFlightRequest nextCompleted(int node) { - return requestQueue(node).pollLast(); - } - - /** - * Can we send more requests to this node? - * - * @param node Node in question - * @return true iff we have no requests still being sent to the given node - */ - public boolean canSendMore(int node) { - Deque queue = requests.get(node); - return queue == null || queue.isEmpty() || - (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection); - } - - /** - * Clear out all the in-flight requests for the given node and return them - * - * @param node The node - * @return All the in-flight requests for that node that have been removed - */ - public Iterable clearAll(int node) { - Deque reqs = requests.get(node); - if (reqs == null) { - return Collections.emptyList(); - } else { - return requests.remove(node); - } - } - - public int totalInFlightRequests() { - int total = 0; - for (Deque deque : this.requests.values()) - total += deque.size(); - return total; - } + this.client.wakeup(); } /** @@ -770,9 +306,7 @@ public class Sender implements Runnable { this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); this.compressionRateSensor = metrics.sensor("compression-rate"); - this.compressionRateSensor.add("compression-rate-avg", - "The average compression rate of record batches.", - new Avg()); + this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg()); this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor.add("record-queue-time-avg", @@ -800,13 +334,13 @@ public class Sender implements Runnable { this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max()); this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { - public double measure(MetricConfig config, long nowMs) { - return inFlightRequests.totalInFlightRequests(); + public double measure(MetricConfig config, long now) { + return client.inFlightRequestCount(); } }); metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() { - public double measure(MetricConfig config, long nowMs) { - return (nowMs - metadata.lastUpdate()) / 1000.0; + public double measure(MetricConfig config, long now) { + return (now - metadata.lastUpdate()) / 1000.0; } }); } @@ -838,14 +372,15 @@ public class Sender implements Runnable { } } - public void updateProduceRequestMetrics(List requests) { - long nowMs = time.milliseconds(); + public void updateProduceRequestMetrics(List requests) { + long now = time.milliseconds(); for (int i = 0; i < requests.size(); i++) { - InFlightRequest request = requests.get(i); + ClientRequest request = requests.get(i); int records = 0; - if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) { + if (request.attachment() != null) { + Map responseBatches = (Map) request.attachment(); + for (RecordBatch batch : responseBatches.values()) { // register all per-topic metrics at once String topic = batch.topicPartition.topic(); @@ -867,43 +402,43 @@ public class Sender implements Runnable { topicCompressionRate.record(batch.records.compressionRate()); // global metrics - this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs); - this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs); + this.batchSizeSensor.record(batch.records.sizeInBytes(), now); + this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now); this.compressionRateSensor.record(batch.records.compressionRate()); - this.maxRecordSizeSensor.record(batch.maxRecordSize, nowMs); + this.maxRecordSizeSensor.record(batch.maxRecordSize, now); records += batch.recordCount; } - this.recordsPerRequestSensor.record(records, nowMs); + this.recordsPerRequestSensor.record(records, now); } } } public void recordRetries(String topic, int count) { - long nowMs = time.milliseconds(); - this.retrySensor.record(count, nowMs); + long now = time.milliseconds(); + this.retrySensor.record(count, now); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); if (topicRetrySensor != null) - topicRetrySensor.record(count, nowMs); + topicRetrySensor.record(count, now); } public void recordErrors(String topic, int count) { - long nowMs = time.milliseconds(); - this.errorSensor.record(count, nowMs); + long now = time.milliseconds(); + this.errorSensor.record(count, now); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); if (topicErrorSensor != null) - topicErrorSensor.record(count, nowMs); + topicErrorSensor.record(count, now); } public void recordLatency(int node, long latency) { - long nowMs = time.milliseconds(); - this.requestTimeSensor.record(latency, nowMs); + long now = time.milliseconds(); + this.requestTimeSensor.record(latency, now); if (node >= 0) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); if (nodeRequestTime != null) - nodeRequestTime.record(latency, nowMs); + nodeRequestTime.record(latency, now); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java index 7c2e33c..79f61bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics; @@ -24,9 +20,9 @@ public interface Measurable { /** * Measure this quantity and return the result as a double * @param config The configuration for this metric - * @param nowMs The POSIX time in milliseconds the measurement is being taken + * @param now The POSIX time in milliseconds the measurement is being taken * @return The measured value */ - public double measure(MetricConfig config, long nowMs); + public double measure(MetricConfig config, long now); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java index c9963cb..ed6767f 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -20,7 +16,6 @@ import java.util.List; import org.apache.kafka.common.metrics.MetricConfig; - /** * A {@link SampledStat} that maintains a simple average over its samples. */ @@ -31,12 +26,12 @@ public class Avg extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long timeMs) { + protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value += value; } @Override - public double combine(List samples, MetricConfig config, long nowMs) { + public double combine(List samples, MetricConfig config, long now) { double total = 0.0; long count = 0; for (int i = 0; i < samples.size(); i++) { diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java index efcd61b..90c0bf5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -20,7 +16,6 @@ import java.util.List; import org.apache.kafka.common.metrics.MetricConfig; - /** * A {@link SampledStat} that maintains a simple count of what it has seen. */ @@ -31,12 +26,12 @@ public class Count extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long timeMs) { + protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value += 1.0; } @Override - public double combine(List samples, MetricConfig config, long nowMs) { + public double combine(List samples, MetricConfig config, long now) { double total = 0.0; for (int i = 0; i < samples.size(); i++) total += samples.get(i).value; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java index c492c38..6bbb0a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -20,7 +16,6 @@ import java.util.List; import org.apache.kafka.common.metrics.MetricConfig; - /** * A {@link SampledStat} that gives the max over its samples. */ @@ -31,12 +26,12 @@ public final class Max extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long timeMs) { + protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value = Math.max(sample.value, value); } @Override - public double combine(List samples, MetricConfig config, long nowMs) { + public double combine(List samples, MetricConfig config, long now) { double max = Double.NEGATIVE_INFINITY; for (int i = 0; i < samples.size(); i++) max = Math.max(max, samples.get(i).value); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java index bd0919c..9f74417 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -20,7 +16,6 @@ import java.util.List; import org.apache.kafka.common.metrics.MetricConfig; - /** * A {@link SampledStat} that gives the min over its samples. */ @@ -31,12 +26,12 @@ public class Min extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long timeMs) { + protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value = Math.min(sample.value, value); } @Override - public double combine(List samples, MetricConfig config, long nowMs) { + public double combine(List samples, MetricConfig config, long now) { double max = Double.MAX_VALUE; for (int i = 0; i < samples.size(); i++) max = Math.min(max, samples.get(i).value); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java index 8300978..c70d577 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -26,7 +22,6 @@ import org.apache.kafka.common.metrics.stats.Histogram.BinScheme; import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme; import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme; - /** * A compound stat that reports one or more percentiles */ @@ -65,16 +60,16 @@ public class Percentiles extends SampledStat implements CompoundStat { for (Percentile percentile : this.percentiles) { final double pct = percentile.percentile(); ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() { - public double measure(MetricConfig config, long nowMs) { - return value(config, nowMs, pct / 100.0); + public double measure(MetricConfig config, long now) { + return value(config, now, pct / 100.0); } })); } return ms; } - public double value(MetricConfig config, long nowMs, double quantile) { - purgeObsoleteSamples(config, nowMs); + public double value(MetricConfig config, long now, double quantile) { + purgeObsoleteSamples(config, now); float count = 0.0f; for (Sample sample : this.samples) count += sample.eventCount; @@ -94,8 +89,8 @@ public class Percentiles extends SampledStat implements CompoundStat { return Double.POSITIVE_INFINITY; } - public double combine(List samples, MetricConfig config, long nowMs) { - return value(config, nowMs, 0.5); + public double combine(List samples, MetricConfig config, long now) { + return value(config, now, 0.5); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 4b481a5..a5838b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -56,9 +56,9 @@ public class Rate implements MeasurableStat { } @Override - public double measure(MetricConfig config, long nowMs) { - double value = stat.measure(config, nowMs); - double elapsed = convert(nowMs - stat.oldest(nowMs).lastWindowMs); + public double measure(MetricConfig config, long now) { + double value = stat.measure(config, now); + double elapsed = convert(now - stat.oldest(now).lastWindowMs); return value / elapsed; } @@ -95,7 +95,7 @@ public class Rate implements MeasurableStat { } @Override - public double combine(List samples, MetricConfig config, long nowMs) { + public double combine(List samples, MetricConfig config, long now) { double total = 0.0; for (int i = 0; i < samples.size(); i++) total += samples.get(i).value; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index 0d4056f..b341b7d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -66,9 +66,9 @@ public abstract class SampledStat implements MeasurableStat { } @Override - public double measure(MetricConfig config, long nowMs) { - purgeObsoleteSamples(config, nowMs); - return combine(this.samples, config, nowMs); + public double measure(MetricConfig config, long now) { + purgeObsoleteSamples(config, now); + return combine(this.samples, config, now); } public Sample current(long timeMs) { @@ -77,9 +77,9 @@ public abstract class SampledStat implements MeasurableStat { return this.samples.get(this.current); } - public Sample oldest(long nowMs) { + public Sample oldest(long now) { if (samples.size() == 0) - this.samples.add(newSample(nowMs)); + this.samples.add(newSample(now)); Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); @@ -91,15 +91,15 @@ public abstract class SampledStat implements MeasurableStat { protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs); - public abstract double combine(List samples, MetricConfig config, long nowMs); + public abstract double combine(List samples, MetricConfig config, long now); /* Timeout any windows that have expired in the absence of any events */ - protected void purgeObsoleteSamples(MetricConfig config, long nowMs) { + protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (int i = 0; i < samples.size(); i++) { Sample sample = this.samples.get(i); - if (nowMs - sample.lastWindowMs >= expireAge) - sample.reset(nowMs); + if (now - sample.lastWindowMs >= expireAge) + sample.reset(now); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java index 53dd3d5..67999a9 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -35,12 +31,12 @@ public class Total implements MeasurableStat { } @Override - public void record(MetricConfig config, double value, long timeMs) { + public void record(MetricConfig config, double value, long now) { this.total += value; } @Override - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return this.total; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index 6350424..c8213e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -42,7 +42,7 @@ public class ByteBufferSend implements Send { } @Override - public boolean complete() { + public boolean completed() { return remaining <= 0; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 3e35898..93f2f1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -414,8 +414,8 @@ public class Selector implements Selectable { this.bytesTransferred = this.metrics.sensor("bytes-sent-received"); bytesTransferred.add("network-io-rate", - "The average number of network operations (reads or writes) on all connections per second.", - new Rate(new Count())); + "The average number of network operations (reads or writes) on all connections per second.", + new Rate(new Count())); this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred); this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate()); @@ -429,11 +429,11 @@ public class Selector implements Selectable { this.selectTime = this.metrics.sensor("select-time"); this.selectTime.add("select-rate", - "Number of times the I/O layer checked for new I/O to perform per second", - new Rate(new Count())); + "Number of times the I/O layer checked for new I/O to perform per second", + new Rate(new Count())); this.selectTime.add("io-wait-time-ns-avg", - "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", - new Avg()); + "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", + new Avg()); this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS)); this.ioTime = this.metrics.sensor("io-time"); @@ -441,7 +441,7 @@ public class Selector implements Selectable { this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS)); this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return keys.size(); } }); @@ -456,7 +456,9 @@ public class Selector implements Selectable { if (nodeRequest == null) { nodeRequest = this.metrics.sensor(nodeRequestName); nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate()); - nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count())); + nodeRequest.add("node-" + node + ".request-rate", + "The average number of requests sent per second.", + new Rate(new Count())); nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); @@ -464,8 +466,8 @@ public class Selector implements Selectable { Sensor nodeResponse = this.metrics.sensor(nodeResponseName); nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate()); nodeResponse.add("node-" + node + ".response-rate", - "The average number of responses received per second.", - new Rate(new Count())); + "The average number of responses received per second.", + new Rate(new Count())); String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName); @@ -476,22 +478,24 @@ public class Selector implements Selectable { } public void recordBytesSent(int node, int bytes) { - long nowMs = time.milliseconds(); - this.bytesSent.record(bytes, nowMs); + long now = time.milliseconds(); + this.bytesSent.record(bytes, now); if (node >= 0) { String nodeRequestName = "node-" + node + ".bytes-sent"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); - if (nodeRequest != null) nodeRequest.record(bytes, nowMs); + if (nodeRequest != null) + nodeRequest.record(bytes, now); } } public void recordBytesReceived(int node, int bytes) { - long nowMs = time.milliseconds(); - this.bytesReceived.record(bytes, nowMs); + long now = time.milliseconds(); + this.bytesReceived.record(bytes, now); if (node >= 0) { String nodeRequestName = "node-" + node + ".bytes-received"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); - if (nodeRequest != null) nodeRequest.record(bytes, nowMs); + if (nodeRequest != null) + nodeRequest.record(bytes, now); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index d62dff9..5d321a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -38,7 +34,7 @@ public interface Send { /** * Is this send complete? */ - public boolean complete(); + public boolean completed(); /** * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 68b8827..7164701 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.protocol.types; @@ -51,8 +47,9 @@ public class Schema extends Type { Object value = f.type().validate(r.get(f)); f.type.write(buffer, value); } catch (Exception e) { - throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass() - .getName()); + throw new SchemaException("Error writing field '" + f.name + + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } } @@ -66,8 +63,9 @@ public class Schema extends Type { try { objects[i] = fields[i].type.read(buffer); } catch (Exception e) { - throw new SchemaException("Error reading field '" + fields[i].name + "': " + e.getMessage() == null ? e.getMessage() - : e.getClass().getName()); + throw new SchemaException("Error reading field '" + fields[i].name + + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } return new Struct(this, objects); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 15c9577..759f577 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.record; @@ -88,14 +84,14 @@ public class MemoryRecords implements Records { /** * Check if we have room for a new record containing the given key/value pair - * - * Note that the return value is based on the estimate of the bytes written to the compressor, - * which may not be accurate if compression is really used. When this happens, the following - * append may cause dynamic buffer re-allocation in the underlying byte buffer stream. + * + * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be + * accurate if compression is really used. When this happens, the following append may cause dynamic buffer + * re-allocation in the underlying byte buffer stream. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && - this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); + return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + + Record.recordSize(key, value); } public boolean isFull() { @@ -169,10 +165,10 @@ public class MemoryRecords implements Records { /* * Read the next record from the buffer. - * - * Note that in the compressed message set, each message value size is set as the size - * of the un-compressed version of the message value, so when we do de-compression - * allocating an array of the specified size for reading compressed value data is sufficient. + * + * Note that in the compressed message set, each message value size is set as the size of the un-compressed + * version of the message value, so when we do de-compression allocating an array of the specified size for + * reading compressed value data is sufficient. */ @Override protected LogEntry makeNext() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java new file mode 100644 index 0000000..6036f6a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -0,0 +1,71 @@ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.MemoryRecords; + +public class ProduceRequest { + + private final short acks; + private final int timeout; + private final Map> records; + + public ProduceRequest(short acks, int timeout) { + this.acks = acks; + this.timeout = timeout; + this.records = new HashMap>(); + } + + public void add(TopicPartition tp, MemoryRecords recs) { + List found = this.records.get(tp.topic()); + if (found == null) { + found = new ArrayList(); + records.put(tp.topic(), found); + } + found.add(new PartitionRecords(tp, recs)); + } + + public Struct toStruct() { + Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id)); + produce.set("acks", acks); + produce.set("timeout", timeout); + List topicDatas = new ArrayList(records.size()); + for (Map.Entry> entry : records.entrySet()) { + Struct topicData = produce.instance("topic_data"); + topicData.set("topic", entry.getKey()); + List parts = entry.getValue(); + Object[] partitionData = new Object[parts.size()]; + for (int i = 0; i < parts.size(); i++) { + ByteBuffer buffer = parts.get(i).records.buffer(); + buffer.flip(); + Struct part = topicData.instance("data") + .set("partition", parts.get(i).topicPartition.partition()) + .set("record_set", buffer); + partitionData[i] = part; + } + topicData.set("data", partitionData); + topicDatas.add(topicData); + } + produce.set("topic_data", topicDatas.toArray()); + return produce; + } + + private static final class PartitionRecords { + public final TopicPartition topicPartition; + public final MemoryRecords records; + + public PartitionRecords(TopicPartition topicPartition, MemoryRecords records) { + this.topicPartition = topicPartition; + this.records = records; + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6fa4a58..6cf4fb7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -3,65 +3,50 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.Struct; - import java.util.HashMap; import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Struct; + public class ProduceResponse { - public class PartitionResponse { - public int partitionId; - public short errorCode; - public long baseOffset; - public PartitionResponse(int partitionId, short errorCode, long baseOffset) { - this.partitionId = partitionId; - this.errorCode = errorCode; - this.baseOffset = baseOffset; - } - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append('{'); - b.append("pid: " + partitionId); - b.append(",error: " + errorCode); - b.append(",offset: " + baseOffset); - b.append('}'); - return b.toString(); - } - } + private final Map responses; - private final Map> responses; + public ProduceResponse() { + this.responses = new HashMap(); + } public ProduceResponse(Struct struct) { - responses = new HashMap>(); + responses = new HashMap(); for (Object topicResponse : (Object[]) struct.get("responses")) { Struct topicRespStruct = (Struct) topicResponse; String topic = (String) topicRespStruct.get("topic"); - Map topicResponses = new HashMap(); for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) { Struct partRespStruct = (Struct) partResponse; int partition = (Integer) partRespStruct.get("partition"); short errorCode = (Short) partRespStruct.get("error_code"); long offset = (Long) partRespStruct.get("base_offset"); TopicPartition tp = new TopicPartition(topic, partition); - topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset)); + responses.put(tp, new PartitionResponse(partition, errorCode, offset)); } - responses.put(topic, topicResponses); } } - public Map> responses() { + public void addResponse(TopicPartition tp, int partition, short error, long baseOffset) { + this.responses.put(tp, new PartitionResponse(partition, error, baseOffset)); + } + + public Map responses() { return this.responses; } @@ -70,16 +55,40 @@ public class ProduceResponse { StringBuilder b = new StringBuilder(); b.append('{'); boolean isFirst = true; - for (Map response : responses.values()) { - for (Map.Entry entry : response.entrySet()) { - if (isFirst) - isFirst = false; - else - b.append(','); - b.append(entry.getKey() + " : " + entry.getValue()); - } + for (Map.Entry entry : responses.entrySet()) { + if (isFirst) + isFirst = false; + else + b.append(','); + b.append(entry.getKey() + " : " + entry.getValue()); } b.append('}'); return b.toString(); } + + public static class PartitionResponse { + public int partitionId; + public short errorCode; + public long baseOffset; + + public PartitionResponse(int partitionId, short errorCode, long baseOffset) { + this.partitionId = partitionId; + this.errorCode = errorCode; + this.baseOffset = baseOffset; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("pid: "); + b.append(partitionId); + b.append(",error: "); + b.append(errorCode); + b.append(",offset: "); + b.append(baseOffset); + b.append('}'); + return b.toString(); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java new file mode 100644 index 0000000..aae8d4a --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -0,0 +1,96 @@ +package org.apache.kafka.clients; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Time; + +/** + * A mock network client for use testing code + */ +public class MockClient implements KafkaClient { + + private final Time time; + private int correlation = 0; + private final Set ready = new HashSet(); + private final Queue requests = new ArrayDeque(); + private final Queue responses = new ArrayDeque(); + + public MockClient(Time time) { + this.time = time; + } + + @Override + public boolean isReady(Node node, long now) { + return ready.contains(node.id()); + } + + @Override + public boolean ready(Node node, long now) { + boolean found = isReady(node, now); + ready.add(node.id()); + return found; + } + + public void disconnect(Integer node) { + Iterator iter = requests.iterator(); + while (iter.hasNext()) { + ClientRequest request = iter.next(); + if (request.request().destination() == node) { + responses.add(new ClientResponse(request, time.milliseconds(), true, null)); + iter.remove(); + } + } + ready.remove(node); + } + + @Override + public List poll(List requests, long timeoutMs, long now) { + this.requests.addAll(requests); + List copy = new ArrayList(this.responses); + this.responses.clear(); + return copy; + } + + public Queue requests() { + return this.requests; + } + + public void respond(Struct body) { + ClientRequest request = requests.remove(); + responses.add(new ClientResponse(request, time.milliseconds(), false, body)); + } + + @Override + public int inFlightRequestCount() { + return requests.size(); + } + + @Override + public RequestHeader nextRequestHeader(ApiKeys key) { + return new RequestHeader(key.id, "mock", correlation++); + } + + @Override + public void wakeup() { + } + + @Override + public void close() { + } + + @Override + public Node leastLoadedNode(long now) { + return null; + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java new file mode 100644 index 0000000..6a3cdcc --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -0,0 +1,99 @@ +package org.apache.kafka.clients; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.ProduceRequest; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +public class NetworkClientTest { + + private MockTime time = new MockTime(); + private MockSelector selector = new MockSelector(time); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private int nodeId = 1; + private Cluster cluster = TestUtils.singletonCluster("test", nodeId); + private Node node = cluster.nodes().get(0); + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024); + + @Before + public void setup() { + metadata.update(cluster, time.milliseconds()); + } + + @Test + public void testReadyAndDisconnect() { + List reqs = new ArrayList(); + assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds())); + assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size()); + client.poll(reqs, 1, time.milliseconds()); + selector.clear(); + assertTrue("Now the client is ready", client.ready(node, time.milliseconds())); + selector.disconnect(node.id()); + client.poll(reqs, 1, time.milliseconds()); + selector.clear(); + assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); + assertTrue("Metadata should get updated.", metadata.needsUpdate(time.milliseconds())); + } + + @Test(expected = IllegalStateException.class) + public void testSendToUnreadyNode() { + RequestSend send = new RequestSend(5, + client.nextRequestHeader(ApiKeys.METADATA), + new MetadataRequest(Arrays.asList("test")).toStruct()); + ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); + client.poll(Arrays.asList(request), 1, time.milliseconds()); + } + + @Test + public void testSimpleRequestResponse() { + ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000); + RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); + RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct()); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); + awaitReady(client, node); + client.poll(Arrays.asList(request), 1, time.milliseconds()); + assertEquals(1, client.inFlightRequestCount()); + ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId()); + Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); + resp.set("responses", new Object[0]); + int size = respHeader.sizeOf() + resp.sizeOf(); + ByteBuffer buffer = ByteBuffer.allocate(size); + respHeader.writeTo(buffer); + resp.writeTo(buffer); + buffer.flip(); + selector.completeReceive(new NetworkReceive(node.id(), buffer)); + List responses = client.poll(new ArrayList(), 1, time.milliseconds()); + assertEquals(1, responses.size()); + ClientResponse response = responses.get(0); + assertTrue("Should have a response body.", response.hasResponse()); + assertEquals("Should be correlated to the original request", request, response.request()); + } + + private void awaitReady(NetworkClient client, Node node) { + while (!client.ready(node, time.milliseconds())) + client.poll(new ArrayList(), 1, time.milliseconds()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index c4072ae..93b58d0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -17,7 +17,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 3ef692c..5489aca 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -16,62 +16,48 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.requests.RequestSend; -import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; public class SenderTest { - private static final String CLIENT_ID = ""; private static final int MAX_REQUEST_SIZE = 1024 * 1024; - private static final long RECONNECT_BACKOFF_MS = 0L; private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; private static final int REQUEST_TIMEOUT_MS = 10000; - private static final int SEND_BUFFER_SIZE = 64 * 1024; - private static final int RECEIVE_BUFFER_SIZE = 64 * 1024; - private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); - private MockSelector selector = new MockSelector(time); + private MockClient client = new MockClient(time); private int batchSize = 16 * 1024; private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time); - private Sender sender = new Sender(selector, + private Sender sender = new Sender(client, metadata, this.accumulator, - CLIENT_ID, MAX_REQUEST_SIZE, - RECONNECT_BACKOFF_MS, ACKS_ALL, MAX_RETRIES, REQUEST_TIMEOUT_MS, - SEND_BUFFER_SIZE, - RECEIVE_BUFFER_SIZE, - MAX_IN_FLIGHT_REQS, metrics, time); @@ -82,21 +68,14 @@ public class SenderTest { @Test public void testSimple() throws Exception { + int offset = 0; Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); + client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); sender.run(time.milliseconds()); - assertEquals("We should have connected", 1, selector.connected().size()); - selector.clear(); - sender.run(time.milliseconds()); - assertEquals("Single request should be sent", 1, selector.completedSends().size()); - RequestSend request = (RequestSend) selector.completedSends().get(0); - selector.clear(); - long offset = 42; - selector.completeReceive(produceResponse(request.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - offset, - Errors.NONE.code())); + assertEquals("All requests completed.", offset, client.inFlightRequestCount()); sender.run(time.milliseconds()); assertTrue("Request should be completed", future.isDone()); assertEquals(offset, future.get().offset()); @@ -106,69 +85,43 @@ public class SenderTest { public void testRetries() throws Exception { // create a sender with retries = 1 int maxRetries = 1; - Sender sender = new Sender(selector, + Sender sender = new Sender(client, metadata, this.accumulator, - CLIENT_ID, MAX_REQUEST_SIZE, - RECONNECT_BACKOFF_MS, ACKS_ALL, maxRetries, REQUEST_TIMEOUT_MS, - SEND_BUFFER_SIZE, - RECEIVE_BUFFER_SIZE, - MAX_IN_FLIGHT_REQS, new Metrics(), time); + // do a successful retry Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); - RequestSend request1 = completeSend(sender); - selector.clear(); - selector.completeReceive(produceResponse(request1.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - -1, - Errors.REQUEST_TIMED_OUT.code())); - sender.run(time.milliseconds()); - selector.clear(); - sender.run(time.milliseconds()); - RequestSend request2 = completeSend(sender); - selector.completeReceive(produceResponse(request2.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - 42, - Errors.NONE.code())); + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals(1, client.inFlightRequestCount()); + client.disconnect(client.requests().peek().request().destination()); + assertEquals(0, client.inFlightRequestCount()); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // resend + assertEquals(1, client.inFlightRequestCount()); + int offset = 0; + client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); sender.run(time.milliseconds()); - assertTrue("Request should retry and complete", future.isDone()); - assertEquals(42, future.get().offset()); - } - - @Test - public void testMetadataRefreshOnNoLeaderException() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); - RequestSend request = completeSend(); - selector.clear(); - selector.completeReceive(produceResponse(request.header().correlationId(), - cluster.leaderFor(tp).id(), - tp.topic(), - tp.partition(), - -1, - Errors.NOT_LEADER_FOR_PARTITION.code())); - sender.run(time.milliseconds()); - completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION); - assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); - } + assertTrue("Request should have retried and completed", future.isDone()); + assertEquals(offset, future.get().offset()); - @Test - public void testMetadataRefreshOnDisconnect() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); - completeSend(); - selector.clear(); - selector.disconnect(cluster.leaderFor(tp).id()); + // do an unsuccessful retry + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + sender.run(time.milliseconds()); // send produce request + for (int i = 0; i < maxRetries + 1; i++) { + client.disconnect(client.requests().peek().request().destination()); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // resend + } sender.run(time.milliseconds()); completedWithError(future, Errors.NETWORK_EXCEPTION); - assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); } private void completedWithError(Future future, Errors error) throws Exception { @@ -181,17 +134,7 @@ public class SenderTest { } } - private RequestSend completeSend() { - return completeSend(sender); - } - - private RequestSend completeSend(Sender sender) { - while (selector.completedSends().size() == 0) - sender.run(time.milliseconds()); - return (RequestSend) selector.completedSends().get(0); - } - - private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { + private Struct produceResponse(String topic, int part, long offset, int error) { Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); Struct response = struct.instance("responses"); response.set("topic", topic); @@ -201,12 +144,7 @@ public class SenderTest { partResp.set("base_offset", offset); response.set("partition_responses", new Object[] { partResp }); struct.set("responses", new Object[] { response }); - ResponseHeader header = new ResponseHeader(correlation); - ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + struct.sizeOf()); - header.writeTo(buffer); - struct.writeTo(buffer); - buffer.rewind(); - return new NetworkReceive(source, buffer); + return struct; } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index e4e0a04..19bea0f 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -213,7 +213,7 @@ public class MetricsTest { public double value = 0.0; @Override - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return value; } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java index cda8e64..eb7fcf0 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java @@ -1,25 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.utils; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.utils.Time; - +/** + * A clock that you can manually advance by calling sleep + */ public class MockTime implements Time { private long nanos = 0; diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index b9405cf..d146444 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -127,7 +127,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException */ @Test - def testNonExistTopic() { + def testNonExistentTopic() { // send a record with non-exist topic val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) intercept[ExecutionException] { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 12f8045..57b2bd5 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -379,6 +379,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000") + producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") return new KafkaProducer(producerProps) }