diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index da76cc2..9ebda5e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -21,22 +21,22 @@ import java.util.Map; */ final class ClusterConnectionStates { private final long reconnectBackoffMs; - private final Map nodeState; + private final Map nodeState; public ClusterConnectionStates(long reconnectBackoffMs) { this.reconnectBackoffMs = reconnectBackoffMs; - this.nodeState = new HashMap(); + this.nodeState = new HashMap(); } /** - * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not + * Return true iff we can currently initiate a new connection. This will be the case if we are not * connected and haven't been connected for at least the minimum reconnection backoff period. - * @param node The node id to check + * @param id The connection id to check * @param now The current time in MS * @return true if we can initiate a new connection */ - public boolean canConnect(int node, long now) { - NodeConnectionState state = nodeState.get(node); + public boolean canConnect(String id, long now) { + NodeConnectionState state = nodeState.get(id); if (state == null) return true; else @@ -45,11 +45,11 @@ final class ClusterConnectionStates { /** * Return true if we are disconnected from the given node and can't re-establish a connection yet - * @param node The node to check + * @param id The connection to check * @param now The current time in ms */ - public boolean isBlackedOut(int node, long now) { - NodeConnectionState state = nodeState.get(node); + public boolean isBlackedOut(String id, long now) { + NodeConnectionState state = nodeState.get(id); if (state == null) return false; else @@ -60,11 +60,11 @@ final class ClusterConnectionStates { * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled * connections. - * @param node The node to check + * @param id The connection to check * @param now The current time in ms */ - public long connectionDelay(int node, long now) { - NodeConnectionState state = nodeState.get(node); + public long connectionDelay(String id, long now) { + NodeConnectionState state = nodeState.get(id); if (state == null) return 0; long timeWaited = now - state.lastConnectAttemptMs; if (state.state == ConnectionState.DISCONNECTED) { @@ -77,67 +77,67 @@ final class ClusterConnectionStates { } /** - * Enter the connecting state for the given node. - * @param node The id of the node we are connecting to + * Enter the connecting state for the given connection. + * @param id The id of the connection * @param now The current time. */ - public void connecting(int node, long now) { - nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); + public void connecting(String id, long now) { + nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now)); } /** - * Return true iff we have a connection to the give node - * @param node The id of the node to check + * Return true iff a specific connection is connected + * @param id The id of the connection to check */ - public boolean isConnected(int node) { - NodeConnectionState state = nodeState.get(node); + public boolean isConnected(String id) { + NodeConnectionState state = nodeState.get(id); return state != null && state.state == ConnectionState.CONNECTED; } /** - * Return true iff we are in the process of connecting to the given node - * @param node The id of the node + * Return true iff we are in the process of connecting + * @param id The id of the connection */ - public boolean isConnecting(int node) { - NodeConnectionState state = nodeState.get(node); + public boolean isConnecting(String id) { + NodeConnectionState state = nodeState.get(id); return state != null && state.state == ConnectionState.CONNECTING; } /** - * Enter the connected state for the given node - * @param node The node we have connected to + * Enter the connected state for the given connection + * @param id The connection identifier */ - public void connected(int node) { - NodeConnectionState nodeState = nodeState(node); + public void connected(String id) { + NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.CONNECTED; } /** * Enter the disconnected state for the given node - * @param node The node we have disconnected from + * @param id The connection we have disconnected */ - public void disconnected(int node) { - NodeConnectionState nodeState = nodeState(node); + public void disconnected(String id) { + NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.DISCONNECTED; } /** - * Get the state of our connection to the given node - * @param node The id of the node + * Get the state of a given connection + * @param id The id of the connection * @return The state of our connection */ - public ConnectionState connectionState(int node) { - return nodeState(node).state; + public ConnectionState connectionState(String id) { + return nodeState(id).state; } /** * Get the state of a given node - * @param node The node to fetch the state for + * @param id The connection to fetch the state for */ - private NodeConnectionState nodeState(int node) { - NodeConnectionState state = this.nodeState.get(node); + private NodeConnectionState nodeState(String id) { + NodeConnectionState state = this.nodeState.get(id); if (state == null) - throw new IllegalStateException("No entry found for node " + node); + throw new IllegalStateException("No entry found for connection " + id); return state; } diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 936487b..15d00d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -24,14 +24,14 @@ import java.util.Map; final class InFlightRequests { private final int maxInFlightRequestsPerConnection; - private final Map> requests = new HashMap>(); + private final Map> requests = new HashMap>(); public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; } /** - * Add the given request to the queue for the node it was directed to + * Add the given request to the queue for the connection it was directed to */ public void add(ClientRequest request) { Deque reqs = this.requests.get(request.request().destination()); @@ -45,7 +45,7 @@ final class InFlightRequests { /** * Get the request queue for the given node */ - private Deque requestQueue(int node) { + private Deque requestQueue(String node) { Deque reqs = requests.get(node); if (reqs == null || reqs.isEmpty()) throw new IllegalStateException("Response from server for which there are no in-flight requests."); @@ -55,7 +55,7 @@ final class InFlightRequests { /** * Get the oldest request (the one that that will be completed next) for the given node */ - public ClientRequest completeNext(int node) { + public ClientRequest completeNext(String node) { return requestQueue(node).pollLast(); } @@ -63,7 +63,7 @@ final class InFlightRequests { * Get the last request we sent to the given node (but don't remove it from the queue) * @param node The node id */ - public ClientRequest lastSent(int node) { + public ClientRequest lastSent(String node) { return requestQueue(node).peekFirst(); } @@ -72,7 +72,7 @@ final class InFlightRequests { * @param node The node the request was sent to * @return The request */ - public ClientRequest completeLastSent(int node) { + public ClientRequest completeLastSent(String node) { return requestQueue(node).pollFirst(); } @@ -82,7 +82,7 @@ final class InFlightRequests { * @param node Node in question * @return true iff we have no requests still being sent to the given node */ - public boolean canSendMore(int node) { + public boolean canSendMore(String node) { Deque queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); @@ -93,7 +93,7 @@ final class InFlightRequests { * @param node The node * @return The request count. */ - public int inFlightRequestCount(int node) { + public int inFlightRequestCount(String node) { Deque queue = requests.get(node); return queue == null ? 0 : queue.size(); } @@ -114,7 +114,7 @@ final class InFlightRequests { * @param node The node * @return All the in-flight requests for that node that have been removed */ - public Iterable clearAll(int node) { + public Iterable clearAll(String node) { Deque reqs = requests.get(node); if (reqs == null) { return Collections.emptyList(); diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 1311f85..7ab2503 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -81,13 +81,13 @@ public interface KafkaClient extends Closeable { public List poll(long timeout, long now); /** - * Complete all in-flight requests for a given node + * Complete all in-flight requests for a given connection * - * @param node The node to complete requests for + * @param id The connection to complete requests for * @param now The current time in ms * @return All requests that complete during this time period. */ - public List completeAll(int node, long now); + public List completeAll(String id, long now); /** * Complete all in-flight requests @@ -117,7 +117,7 @@ public interface KafkaClient extends Closeable { * * @param nodeId The id of the node */ - public int inFlightRequestCount(int nodeId); + public int inFlightRequestCount(String nodeId); /** * Generate a request header for the next request diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 435fbb5..48fe796 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -22,8 +22,8 @@ import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; @@ -111,7 +111,7 @@ public class NetworkClient implements KafkaClient { if (isReady(node, now)) return true; - if (connectionStates.canConnect(node.id(), now)) + if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one initiateConnect(node, now); @@ -129,7 +129,7 @@ public class NetworkClient implements KafkaClient { */ @Override public long connectionDelay(Node node, long now) { - return connectionStates.connectionDelay(node.id(), now); + return connectionStates.connectionDelay(node.idString(), now); } /** @@ -142,7 +142,7 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean connectionFailed(Node node) { - return connectionStates.connectionState(node.id()).equals(ConnectionState.DISCONNECTED); + return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED); } /** @@ -154,7 +154,7 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean isReady(Node node, long now) { - int nodeId = node.id(); + String nodeId = node.idString(); if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) // if we need to update our metadata now declare all requests unready to make metadata requests first // priority @@ -165,11 +165,11 @@ public class NetworkClient implements KafkaClient { } /** - * Are we connected and ready and able to send more requests to the given node? + * Are we connected and ready and able to send more requests to the given connection? * * @param node The node */ - private boolean isSendable(int node) { + private boolean isSendable(String node) { return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); } @@ -179,7 +179,7 @@ public class NetworkClient implements KafkaClient { * @param node The node to check * @return The connection state */ - public ConnectionState connectionState(int node) { + public ConnectionState connectionState(String node) { return connectionStates.connectionState(node); } @@ -190,7 +190,7 @@ public class NetworkClient implements KafkaClient { */ @Override public void send(ClientRequest request) { - int nodeId = request.request().destination(); + String nodeId = request.request().destination(); if (!isSendable(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); @@ -252,7 +252,7 @@ public class NetworkClient implements KafkaClient { * @return All the collected responses */ @Override - public List completeAll(int node, long now) { + public List completeAll(String node, long now) { try { this.selector.muteAll(); this.selector.unmute(node); @@ -288,8 +288,8 @@ public class NetworkClient implements KafkaClient { * Get the number of in-flight requests for a given node */ @Override - public int inFlightRequestCount(int nodeId) { - return this.inFlightRequests.inFlightRequestCount(nodeId); + public int inFlightRequestCount(String node) { + return this.inFlightRequests.inFlightRequestCount(node); } /** @@ -334,11 +334,11 @@ public class NetworkClient implements KafkaClient { for (int i = 0; i < nodes.size(); i++) { int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size()); Node node = nodes.get(idx); - int currInflight = this.inFlightRequests.inFlightRequestCount(node.id()); - if (currInflight == 0 && this.connectionStates.isConnected(node.id())) { + int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString()); + if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) { // if we find an established connection with no in-flight requests we can stop right away return node; - } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) { + } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; @@ -355,7 +355,7 @@ public class NetworkClient implements KafkaClient { */ private void handleCompletedSends(List responses, long now) { // if no response is expected then when the send is completed, return it - for (NetworkSend send : this.selector.completedSends()) { + for (Send send : this.selector.completedSends()) { ClientRequest request = this.inFlightRequests.lastSent(send.destination()); if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); @@ -372,7 +372,7 @@ public class NetworkClient implements KafkaClient { */ private void handleCompletedReceives(List responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { - int source = receive.source(); + String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); short apiKey = req.request().header().apiKey(); @@ -412,7 +412,7 @@ public class NetworkClient implements KafkaClient { * @param now The current time */ private void handleDisconnections(List responses, long now) { - for (int node : this.selector.disconnected()) { + for (String node : this.selector.disconnected()) { connectionStates.disconnected(node); log.debug("Node {} disconnected.", node); for (ClientRequest request : this.inFlightRequests.clearAll(node)) { @@ -433,9 +433,9 @@ public class NetworkClient implements KafkaClient { * Record any newly completed connections */ private void handleConnections() { - for (Integer id : this.selector.connected()) { - log.debug("Completed connection to node {}", id); - this.connectionStates.connected(id); + for (String node : this.selector.connected()) { + log.debug("Completed connection to node {}", node); + this.connectionStates.connected(node); } } @@ -451,7 +451,7 @@ public class NetworkClient implements KafkaClient { /** * Create a metadata request for the given topics */ - private ClientRequest metadataRequest(long now, int node, Set topics) { + private ClientRequest metadataRequest(long now, String node, Set topics) { MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); return new ClientRequest(now, true, send, null); @@ -470,15 +470,17 @@ public class NetworkClient implements KafkaClient { this.lastNoNodeAvailableMs = now; return; } + String nodeConnectionId = node.idString(); - if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + + if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) { Set topics = metadata.topics(); this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); this.selector.send(metadataRequest.request()); this.inFlightRequests.add(metadataRequest); - } else if (connectionStates.canConnect(node.id(), now)) { + } else if (connectionStates.canConnect(nodeConnectionId, now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); initiateConnect(node, now); @@ -497,16 +499,17 @@ public class NetworkClient implements KafkaClient { * Initiate a connection to the given node */ private void initiateConnect(Node node, long now) { + String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - this.connectionStates.connecting(node.id(), now); - selector.connect(node.id(), + this.connectionStates.connecting(nodeConnectionId, now); + selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - connectionStates.disconnected(node.id()); + connectionStates.disconnected(nodeConnectionId); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index bdff518..55837cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -156,6 +156,10 @@ public class ConsumerConfig extends AbstractConfig { public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer interface."; + /** connections.max.idle.ms */ + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; + public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config."; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -277,7 +281,12 @@ public class ConsumerConfig extends AbstractConfig { .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, - VALUE_DESERIALIZER_CLASS_DOC); + VALUE_DESERIALIZER_CLASS_DOC) + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, + Type.LONG, + 10 * 60 * 1000, + Importance.MEDIUM, + CONNECTIONS_MAX_IDLE_MS_DOC); } public static Map addDeserializerToConfig(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d301be4..d1d1ec1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -472,7 +472,8 @@ public class KafkaConsumer implements Consumer { String metricGrpPrefix = "consumer"; Map metricsTags = new LinkedHashMap(); metricsTags.put("client-id", clientId); - this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags), + this.client = new NetworkClient( + new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags), this.metadata, clientId, 100, // a fixed large enough value will suffice diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index e55ab11..27ff351 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -416,7 +416,7 @@ public final class Coordinator { log.debug("Issuing consumer metadata request to broker {}", node.id()); ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId); - RequestSend send = new RequestSend(node.id(), + RequestSend send = new RequestSend(node.idString(), this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA), request.toStruct()); long now = time.milliseconds(); @@ -435,7 +435,7 @@ public final class Coordinator { log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id()); RequestHeader header = this.client.nextRequestHeader(api); - RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request); + RequestSend send = new RequestSend(this.consumerCoordinator.idString(), header, request); return new ClientRequest(now, true, send, handler); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ef9dd52..94f5a66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -124,7 +124,7 @@ public class Fetcher { */ public void initFetches(Cluster cluster, long now) { for (ClientRequest request : createFetchRequests(cluster)) { - Node node = cluster.nodeById(request.request().destination()); + Node node = cluster.nodeById(Integer.parseInt(request.request().destination())); if (client.ready(node, now)) { log.trace("Initiating fetch to node {}: {}", node.id(), request); client.send(request); @@ -209,12 +209,12 @@ public class Fetcher { } else if (this.client.ready(info.leader(), now)) { Node node = info.leader(); ListOffsetRequest request = new ListOffsetRequest(-1, partitions); - RequestSend send = new RequestSend(node.id(), + RequestSend send = new RequestSend(node.idString(), this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS), request.toStruct()); ClientRequest clientRequest = new ClientRequest(now, true, send, null); this.client.send(clientRequest); - List responses = this.client.completeAll(node.id(), now); + List responses = this.client.completeAll(node.idString(), now); if (responses.isEmpty()) throw new IllegalStateException("This should not happen."); ClientResponse response = responses.get(responses.size() - 1); @@ -257,7 +257,7 @@ public class Fetcher { for (TopicPartition partition : subscriptions.assignedPartitions()) { Node node = cluster.leaderFor(partition); // if there is a leader and no in-flight requests, issue a new fetch - if (node != null && this.client.inFlightRequestCount(node.id()) == 0) { + if (node != null && this.client.inFlightRequestCount(node.idString()) == 0) { Map fetch = fetchable.get(node.id()); if (fetch == null) { fetch = new HashMap(); @@ -273,7 +273,7 @@ public class Fetcher { for (Map.Entry> entry : fetchable.entrySet()) { int nodeId = entry.getKey(); final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue()); - RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct()); + RequestSend send = new RequestSend(Integer.toString(nodeId), this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct()); RequestCompletionHandler handler = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleFetchResponse(response, fetch); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 8e336a3..7891943 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -228,7 +228,8 @@ public class KafkaProducer implements Producer { List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags), + NetworkClient client = new NetworkClient( + new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 187d000..7145fb0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -169,6 +169,10 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + /** connections.max.idle.ms */ + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; + public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config."; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -217,7 +221,8 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 10 * 60 * 1000, Importance.MEDIUM, CONNECTIONS_MAX_IDLE_MS_DOC); } public static Map addSerializerToConfig(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1e943d6..07e65d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -320,7 +320,7 @@ public class Sender implements Runnable { recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); - RequestSend send = new RequestSend(destination, + RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); RequestCompletionHandler callback = new RequestCompletionHandler() { @@ -505,10 +505,10 @@ public class Sender implements Runnable { topicErrorSensor.record(count, now); } - public void recordLatency(int node, long latency) { + public void recordLatency(String node, long latency) { long now = time.milliseconds(); this.requestTimeSensor.record(latency, now); - if (node >= 0) { + if (!node.isEmpty()) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); if (nodeRequestTime != null) diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index f4e4186..644cd71 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -18,12 +18,14 @@ package org.apache.kafka.common; public class Node { private final int id; + private final String idString; private final String host; private final int port; public Node(int id, String host, int port) { super(); this.id = id; + this.idString = Integer.toString(id); this.host = host; this.port = port; } @@ -40,6 +42,14 @@ public class Node { } /** + * String representation of the node id. + * Typically the integer id is used to serialize over the wire, the string representation is used as an identifier with NetworkClient code + */ + public String idString() { + return idString; + } + + /** * The host name for this node */ public String host() { diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java index 129ae82..159c301 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java @@ -25,11 +25,11 @@ import java.nio.channels.ScatteringByteChannel; */ public class ByteBufferReceive implements Receive { - private final int source; + private final String source; private final ByteBuffer[] buffers; private int remaining; - public ByteBufferReceive(int source, ByteBuffer... buffers) { + public ByteBufferReceive(String source, ByteBuffer... buffers) { super(); this.source = source; this.buffers = buffers; @@ -38,7 +38,7 @@ public class ByteBufferReceive implements Receive { } @Override - public int source() { + public String source() { return source; } @@ -54,8 +54,4 @@ public class ByteBufferReceive implements Receive { return read; } - public ByteBuffer[] reify() { - return buffers; - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index c8213e1..ca5210b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -22,12 +22,12 @@ import java.nio.channels.GatheringByteChannel; */ public class ByteBufferSend implements Send { - private final int destination; + private final String destination; protected final ByteBuffer[] buffers; private int remaining; private int size; - public ByteBufferSend(int destination, ByteBuffer... buffers) { + public ByteBufferSend(String destination, ByteBuffer... buffers) { super(); this.destination = destination; this.buffers = buffers; @@ -37,7 +37,7 @@ public class ByteBufferSend implements Send { } @Override - public int destination() { + public String destination() { return destination; } @@ -47,26 +47,18 @@ public class ByteBufferSend implements Send { } @Override - public ByteBuffer[] reify() { - return this.buffers; - } - - @Override - public int remaining() { - return this.remaining; - } - public int size() { return this.size; } @Override - public long writeTo(GatheringByteChannel channel) throws IOException { + public int writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("This shouldn't happen."); + if (written > Integer.MAX_VALUE) + throw new IOException("Wrote more bytes than " + size + ". This shouldn't happen."); remaining -= written; - return written; + return (int) written; } - } diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java new file mode 100644 index 0000000..f7cfcfe --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.GatheringByteChannel; +import java.util.Iterator; +import java.util.List; + +/** + * A set of composite sends, sent one after another + */ + +public class MultiSend implements Send { + + private static final Logger log = LoggerFactory.getLogger(MultiSend.class); + private String dest; + private long totalWritten = 0; + private List sends; + private Iterator sendsIterator; + private Send current; + private boolean doneSends = false; + private int size = 0; + + public MultiSend(String dest, List sends) { + this.dest = dest; + this.sends = sends; + this.sendsIterator = sends.iterator(); + nextSendOrDone(); + for (Send send: sends) + this.size += send.size(); + } + + @Override + public int size() { + return size; + } + + @Override + public String destination() { + return dest; + } + + @Override + public boolean completed() { + if (doneSends) { + if (totalWritten != size) + log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten); + return true; + } else { + return false; + } + } + + @Override + public int writeTo(GatheringByteChannel channel) throws IOException { + if (completed()) + throw new KafkaException("This operation cannot be completed on a complete request."); + + int totalWrittenPerCall = 0; + boolean sendComplete = false; + do { + long written = current.writeTo(channel); + totalWritten += written; + totalWrittenPerCall += written; + sendComplete = current.completed(); + if (sendComplete) + nextSendOrDone(); + } while (!completed() && sendComplete); + if (log.isTraceEnabled()) { + log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size); + } + return totalWrittenPerCall; + } + + // update current if there's a next Send, mark sends as done if there isn't + private void nextSendOrDone() { + if (sendsIterator.hasNext()) + current = sendsIterator.next(); + else + doneSends = true; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index fc0d168..5e3a4a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.network; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.ScatteringByteChannel; /** @@ -22,24 +23,42 @@ import java.nio.channels.ScatteringByteChannel; */ public class NetworkReceive implements Receive { - private final int source; + public final static String UNKNOWN_SOURCE = ""; + public final static int UNLIMITED = -1; + + private final String source; private final ByteBuffer size; + private final int maxSize; private ByteBuffer buffer; - public NetworkReceive(int source, ByteBuffer buffer) { + + public NetworkReceive(String source, ByteBuffer buffer) { this.source = source; this.buffer = buffer; this.size = null; + this.maxSize = UNLIMITED; + } + + public NetworkReceive(String source) { + this.source = source; + this.size = ByteBuffer.allocate(4); + this.buffer = null; + this.maxSize = UNLIMITED; } - public NetworkReceive(int source) { + public NetworkReceive(int maxSize, String source) { this.source = source; this.size = ByteBuffer.allocate(4); this.buffer = null; + this.maxSize = maxSize; + } + + public NetworkReceive() { + this(UNKNOWN_SOURCE); } @Override - public int source() { + public String source() { return source; } @@ -48,13 +67,15 @@ public class NetworkReceive implements Receive { return !size.hasRemaining() && !buffer.hasRemaining(); } - @Override - public ByteBuffer[] reify() { - return new ByteBuffer[] {this.buffer}; + public long readFrom(ScatteringByteChannel channel) throws IOException { + return readFromReadableChannel(channel); } - @Override - public long readFrom(ScatteringByteChannel channel) throws IOException { + // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout + // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work + // This can go away after we get rid of BlockingChannel + @Deprecated + public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); @@ -63,10 +84,12 @@ public class NetworkReceive implements Receive { read += bytesRead; if (!size.hasRemaining()) { size.rewind(); - int requestSize = size.getInt(); - if (requestSize < 0) - throw new IllegalStateException("Invalid request (size = " + requestSize + ")"); - this.buffer = ByteBuffer.allocate(requestSize); + int receiveSize = size.getInt(); + if (receiveSize < 0) + throw new IllegalStateException("Invalid request (size = " + receiveSize + ")"); + if (maxSize != UNLIMITED && receiveSize > maxSize) + throw new IllegalStateException("Invalid request (size = " + receiveSize + " larger than " + maxSize + ")"); + this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { @@ -83,4 +106,14 @@ public class NetworkReceive implements Receive { return this.buffer; } + // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel + @Deprecated + public long readCompletely(ReadableByteChannel channel) throws IOException { + int totalRead = 0; + while (!complete()) { + totalRead += readFromReadableChannel(channel); + } + return totalRead; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java index 68327cd..49964b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java @@ -23,7 +23,7 @@ import java.nio.ByteBuffer; */ public class NetworkSend extends ByteBufferSend { - public NetworkSend(int destination, ByteBuffer... buffers) { + public NetworkSend(String destination, ByteBuffer... buffers) { super(destination, sizeDelimit(buffers)); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java b/clients/src/main/java/org/apache/kafka/common/network/Receive.java index 4e33078..4b14431 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.network; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.ScatteringByteChannel; /** @@ -28,7 +27,7 @@ public interface Receive { /** * The numeric id of the source from which we are receiving data. */ - public int source(); + public String source(); /** * Are we done receiving data? @@ -36,11 +35,6 @@ public interface Receive { public boolean complete(); /** - * Turn this receive into ByteBuffer instances, if possible (otherwise returns null). - */ - public ByteBuffer[] reify(); - - /** * Read bytes into this receive from the given channel * @param channel The channel to read from * @return The number of bytes read diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b5f8d83..618a0fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -29,12 +29,12 @@ public interface Selectable { * @param receiveBufferSize The receive buffer for the socket * @throws IOException If we cannot begin connecting */ - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException; + public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException; /** * Begin disconnecting the connection identified by the given id */ - public void disconnect(int id); + public void disconnect(String id); /** * Wakeup this selector if it is blocked on I/O @@ -50,7 +50,7 @@ public interface Selectable { * Queue the given request for sending in the subsequent {@poll(long)} calls * @param send The request to send */ - public void send(NetworkSend send); + public void send(Send send); /** * Do I/O. Reads, writes, connection establishment, etc. @@ -62,7 +62,7 @@ public interface Selectable { /** * The list of sends that completed on the last {@link #poll(long, List) poll()} call. */ - public List completedSends(); + public List completedSends(); /** * The list of receives that completed on the last {@link #poll(long, List) poll()} call. @@ -73,25 +73,25 @@ public interface Selectable { * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()} * call. */ - public List disconnected(); + public List disconnected(); /** * The list of connections that completed their connection on the last {@link #poll(long, List) poll()} * call. */ - public List connected(); + public List connected(); /** * Disable reads from the given connection * @param id The id for the connection */ - public void mute(int id); + public void mute(String id); /** * Re-enable reads from the given connection * @param id The id for the connection */ - public void unmute(int id); + public void unmute(String id); /** * Disable reads from all connections diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 57de058..6744188 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -17,17 +17,8 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.nio.channels.*; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.KafkaException; @@ -40,20 +31,21 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A selector interface for doing non-blocking multi-connection network I/O. + * A nioSelector interface for doing non-blocking multi-connection network I/O. *

* This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and * responses. *

- * A connection can be added to the selector associated with an integer id by doing + * A connection can be added to the nioSelector associated with an integer id by doing * *

- * selector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
+ * nioSelector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
  * 
* * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating @@ -64,10 +56,10 @@ import org.slf4j.LoggerFactory; * *
  * List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
- * selector.poll(TIMEOUT_MS, requestsToSend);
+ * nioSelector.poll(TIMEOUT_MS, requestsToSend);
  * 
* - * The selector maintains several lists that are reset by each call to poll() which are available via + * The nioSelector maintains several lists that are reset by each call to poll() which are available via * various getters. These are reset by each call to poll(). * * This class is not thread safe! @@ -76,41 +68,59 @@ public class Selector implements Selectable { private static final Logger log = LoggerFactory.getLogger(Selector.class); - private final java.nio.channels.Selector selector; - private final Map keys; - private final List completedSends; + private final java.nio.channels.Selector nioSelector; + private final Map keys; + private final List completedSends; private final List completedReceives; - private final List disconnected; - private final List connected; - private final List failedSends; + private final List disconnected; + private final List connected; + private final List failedSends; private final Time time; private final SelectorMetrics sensors; private final String metricGrpPrefix; private final Map metricTags; + private final Map lruConnections; + private final long connectionsMaxIdleNanos; + private final int maxReceiveSize; + private final boolean metricsPerConnection; + private long currentTimeNanos; + private long nextIdleCloseCheckTime; + /** - * Create a new selector + * Create a new nioSelector */ - public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map metricTags) { + public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map metricTags, boolean metricsPerConnection) { try { - this.selector = java.nio.channels.Selector.open(); + this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); } + this.maxReceiveSize = maxReceiveSize; + this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000; this.time = time; this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; - this.keys = new HashMap(); - this.completedSends = new ArrayList(); + this.keys = new HashMap(); + this.completedSends = new ArrayList(); this.completedReceives = new ArrayList(); - this.connected = new ArrayList(); - this.disconnected = new ArrayList(); - this.failedSends = new ArrayList(); + this.connected = new ArrayList(); + this.disconnected = new ArrayList(); + this.failedSends = new ArrayList(); this.sensors = new SelectorMetrics(metrics); + // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true + this.lruConnections = new LinkedHashMap(16, .75F, true); + currentTimeNanos = new SystemTime().nanoseconds(); + nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; + this.metricsPerConnection = metricsPerConnection; + } + + public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map metricTags) { + this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true); } /** - * Begin connecting to the given address and add the connection to this selector associated with the given id + * Begin connecting to the given address and add the connection to this nioSelector associated with the given id * number. *

* Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)} @@ -123,7 +133,7 @@ public class Selector implements Selectable { * @throws IOException if DNS resolution fails on the hostname or if the broker is down */ @Override - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.keys.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); @@ -143,7 +153,18 @@ public class Selector implements Selectable { channel.close(); throw e; } - SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); + SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT); + key.attach(new Transmissions(id)); + this.keys.put(id, key); + } + + /** + * Register the nioSelector with an existing channel + * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector + * Note that we are not checking if the connection id is valid - since the connection already exists + */ + public void register(String id, SocketChannel channel) throws ClosedChannelException { + SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ); key.attach(new Transmissions(id)); this.keys.put(id, key); } @@ -153,18 +174,18 @@ public class Selector implements Selectable { * processed until the next {@link #poll(long, List) poll()} call. */ @Override - public void disconnect(int id) { + public void disconnect(String id) { SelectionKey key = this.keys.get(id); if (key != null) key.cancel(); } /** - * Interrupt the selector if it is blocked waiting to do I/O. + * Interrupt the nioSelector if it is blocked waiting to do I/O. */ @Override public void wakeup() { - this.selector.wakeup(); + this.nioSelector.wakeup(); } /** @@ -172,12 +193,14 @@ public class Selector implements Selectable { */ @Override public void close() { - for (SelectionKey key : this.selector.keys()) - close(key); + List connections = new LinkedList(keys.keySet()); + for (String id: connections) + close(id); + try { - this.selector.close(); + this.nioSelector.close(); } catch (IOException e) { - log.error("Exception closing selector:", e); + log.error("Exception closing nioSelector:", e); } } @@ -185,7 +208,7 @@ public class Selector implements Selectable { * Queue the given request for sending in the subsequent {@poll(long)} calls * @param send The request to send */ - public void send(NetworkSend send) { + public void send(Send send) { SelectionKey key = keyForId(send.destination()); Transmissions transmissions = transmissions(key); if (transmissions.hasSend()) @@ -194,7 +217,7 @@ public class Selector implements Selectable { try { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } catch (CancelledKeyException e) { - close(key); + close(transmissions.id); this.failedSends.add(send.destination()); } } @@ -220,10 +243,11 @@ public class Selector implements Selectable { long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); + currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0) { - Set keys = this.selector.selectedKeys(); + Set keys = this.nioSelector.selectedKeys(); Iterator iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -234,7 +258,7 @@ public class Selector implements Selectable { // register all per-broker metrics at once sensors.maybeRegisterNodeMetrics(transmissions.id); - + lruConnections.put(transmissions.id, currentTimeNanos); try { /* complete any connections that have finished their handshake */ if (key.isConnectable()) { @@ -247,7 +271,7 @@ public class Selector implements Selectable { /* read from any connections that have readable data */ if (key.isReadable()) { if (!transmissions.hasReceive()) - transmissions.receive = new NetworkReceive(transmissions.id); + transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id); transmissions.receive.readFrom(channel); if (transmissions.receive.complete()) { transmissions.receive.payload().rewind(); @@ -260,7 +284,7 @@ public class Selector implements Selectable { /* write to any sockets that have space in their buffer and for which we have data */ if (key.isWritable()) { transmissions.send.writeTo(channel); - if (transmissions.send.remaining() <= 0) { + if (transmissions.send.completed()) { this.completedSends.add(transmissions.send); this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); transmissions.clearSend(); @@ -270,7 +294,7 @@ public class Selector implements Selectable { /* cancel any defunct sockets */ if (!key.isValid()) { - close(key); + close(transmissions.id); this.disconnected.add(transmissions.id); } } catch (IOException e) { @@ -279,13 +303,14 @@ public class Selector implements Selectable { log.info("Connection {} disconnected", desc); else log.warn("Error in I/O with connection to {}", desc, e); - close(key); + close(transmissions.id); this.disconnected.add(transmissions.id); } } } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); + maybeCloseOldestConnection(); } private String socketDescription(SocketChannel channel) { @@ -299,7 +324,7 @@ public class Selector implements Selectable { } @Override - public List completedSends() { + public List completedSends() { return this.completedSends; } @@ -309,17 +334,17 @@ public class Selector implements Selectable { } @Override - public List disconnected() { + public List disconnected() { return this.disconnected; } @Override - public List connected() { + public List connected() { return this.connected; } @Override - public void mute(int id) { + public void mute(String id) { mute(this.keyForId(id)); } @@ -328,7 +353,7 @@ public class Selector implements Selectable { } @Override - public void unmute(int id) { + public void unmute(String id) { unmute(this.keyForId(id)); } @@ -348,6 +373,25 @@ public class Selector implements Selectable { unmute(key); } + private void maybeCloseOldestConnection() { + if (currentTimeNanos > nextIdleCloseCheckTime) { + if (lruConnections.isEmpty()) { + nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; + } else { + Map.Entry oldestConnectionEntry = lruConnections.entrySet().iterator().next(); + Long connectionLastActiveTime = oldestConnectionEntry.getValue(); + nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; + if (currentTimeNanos > nextIdleCloseCheckTime) { + String connectionId = oldestConnectionEntry.getKey(); + if (log.isTraceEnabled()) + log.trace("About to close the idle connection from " + connectionId + + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); + close(connectionId); + } + } + } + } + /** * Clear the results from the prior poll */ @@ -369,17 +413,19 @@ public class Selector implements Selectable { */ private int select(long ms) throws IOException { if (ms == 0L) - return this.selector.selectNow(); + return this.nioSelector.selectNow(); else if (ms < 0L) - return this.selector.select(); + return this.nioSelector.select(); else - return this.selector.select(ms); + return this.nioSelector.select(ms); } /** * Begin closing this connection */ - private void close(SelectionKey key) { + public void close(String id) { + SelectionKey key = keyForId(id); + lruConnections.remove(id); SocketChannel channel = channel(key); Transmissions trans = transmissions(key); if (trans != null) { @@ -401,10 +447,10 @@ public class Selector implements Selectable { /** * Get the selection key associated with this numeric id */ - private SelectionKey keyForId(int id) { + private SelectionKey keyForId(String id) { SelectionKey key = this.keys.get(id); if (key == null) - throw new IllegalStateException("Attempt to write to socket for which there is no open connection."); + throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString()); return key; } @@ -426,11 +472,11 @@ public class Selector implements Selectable { * The id and in-progress send and receive associated with a connection */ private static class Transmissions { - public int id; - public NetworkSend send; + public String id; + public Send send; public NetworkReceive receive; - public Transmissions(int id) { + public Transmissions(String id) { this.id = id; } @@ -515,8 +561,8 @@ public class Selector implements Selectable { }); } - public void maybeRegisterNodeMetrics(int node) { - if (node >= 0) { + public void maybeRegisterNodeMetrics(String node) { + if (!node.isEmpty() && metricsPerConnection) { // if one sensor of the metrics has been registered for the node, // then all other sensors should have been registered; and vice versa String nodeRequestName = "node-" + node + ".bytes-sent"; @@ -554,10 +600,10 @@ public class Selector implements Selectable { } } - public void recordBytesSent(int node, int bytes) { + public void recordBytesSent(String node, int bytes) { long now = time.milliseconds(); this.bytesSent.record(bytes, now); - if (node >= 0) { + if (!node.isEmpty()) { String nodeRequestName = "node-" + node + ".bytes-sent"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); if (nodeRequest != null) @@ -565,10 +611,10 @@ public class Selector implements Selectable { } } - public void recordBytesReceived(int node, int bytes) { + public void recordBytesReceived(String node, int bytes) { long now = time.milliseconds(); this.bytesReceived.record(bytes, now); - if (node >= 0) { + if (!node.isEmpty()) { String nodeRequestName = "node-" + node + ".bytes-received"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); if (nodeRequest != null) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index 5d321a0..00e0a94 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -13,7 +13,6 @@ package org.apache.kafka.common.network; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; /** @@ -24,12 +23,7 @@ public interface Send { /** * The numeric id for the destination of this send */ - public int destination(); - - /** - * The number of bytes remaining to send - */ - public int remaining(); + public String destination(); /** * Is this send complete? @@ -37,17 +31,17 @@ public interface Send { public boolean completed(); /** - * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null) - */ - public ByteBuffer[] reify(); - - /** * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send * to be completely written * @param channel The channel to write to * @return The number of bytes written * @throws IOException If the write fails */ - public long writeTo(GatheringByteChannel channel) throws IOException; + public int writeTo(GatheringByteChannel channel) throws IOException; + + /** + * Size of the send + */ + public int size(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java index 27cbf39..3fec60b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java @@ -25,7 +25,7 @@ public class RequestSend extends NetworkSend { private final RequestHeader header; private final Struct body; - public RequestSend(int destination, RequestHeader header, Struct body) { + public RequestSend(String destination, RequestHeader header, Struct body) { super(destination, serialize(header, body)); this.header = header; this.body = body; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java new file mode 100644 index 0000000..12b06d1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ResponseSend extends NetworkSend { + + public ResponseSend(String destination, ResponseHeader header, Struct body) { + super(destination, serialize(header, body)); + } + + public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) { + this(destination, header, response.toStruct()); + } + + private static ByteBuffer serialize(ResponseHeader header, Struct body) { + ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf()); + header.writeTo(buffer); + body.writeTo(buffer); + buffer.rewind(); + return buffer; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 5e3fab1..d9c97e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -78,7 +78,7 @@ public class MockClient implements KafkaClient { return false; } - public void disconnect(Integer node) { + public void disconnect(String node) { Iterator iter = requests.iterator(); while (iter.hasNext()) { ClientRequest request = iter.next(); @@ -115,7 +115,7 @@ public class MockClient implements KafkaClient { } @Override - public List completeAll(int node, long now) { + public List completeAll(String node, long now) { return completeAll(now); } @@ -158,7 +158,7 @@ public class MockClient implements KafkaClient { } @Override - public int inFlightRequestCount(int nodeId) { + public int inFlightRequestCount(String nodeId) { return requests.size(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 8b27889..43238ce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -65,7 +65,7 @@ public class NetworkClientTest { client.poll(1, time.milliseconds()); selector.clear(); assertTrue("Now the client is ready", client.ready(node, time.milliseconds())); - selector.disconnect(node.id()); + selector.disconnect(node.idString()); client.poll(1, time.milliseconds()); selector.clear(); assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); @@ -74,7 +74,7 @@ public class NetworkClientTest { @Test(expected = IllegalStateException.class) public void testSendToUnreadyNode() { - RequestSend send = new RequestSend(5, + RequestSend send = new RequestSend("5", client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); @@ -86,7 +86,7 @@ public class NetworkClientTest { public void testSimpleRequestResponse() { ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap()); RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); - RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct()); + RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct()); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); awaitReady(client, node); @@ -101,7 +101,7 @@ public class NetworkClientTest { respHeader.writeTo(buffer); resp.writeTo(buffer); buffer.flip(); - selector.completeReceive(new NetworkReceive(node.id(), buffer)); + selector.completeReceive(new NetworkReceive(node.idString(), buffer)); List responses = client.poll(1, time.milliseconds()); assertEquals(1, responses.size()); assertTrue("The handler should have executed.", handler.executed); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d5b306b..d23b4b6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -22,10 +22,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; @@ -49,7 +46,7 @@ public class SelectorTest { public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap()); + this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap()); } @After @@ -63,7 +60,7 @@ public class SelectorTest { */ @Test public void testServerDisconnect() throws Exception { - int node = 0; + String node = "0"; // connect and do a simple request blockingConnect(node); @@ -84,7 +81,7 @@ public class SelectorTest { */ @Test public void testClientDisconnect() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); selector.disconnect(node); selector.send(createSend(node, "hello1")); @@ -101,7 +98,7 @@ public class SelectorTest { */ @Test(expected = IllegalStateException.class) public void testCantSendWithInProgress() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); selector.send(createSend(node, "test1")); selector.send(createSend(node, "test2")); @@ -113,7 +110,7 @@ public class SelectorTest { */ @Test(expected = IllegalStateException.class) public void testCantSendWithoutConnecting() throws Exception { - selector.send(createSend(0, "test")); + selector.send(createSend("0", "test")); selector.poll(1000L); } @@ -122,7 +119,7 @@ public class SelectorTest { */ @Test(expected = IOException.class) public void testNoRouteToHost() throws Exception { - selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE); + selector.connect("0", new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE); } /** @@ -130,7 +127,7 @@ public class SelectorTest { */ @Test public void testConnectionRefused() throws Exception { - int node = 0; + String node = "0"; ServerSocket nonListeningSocket = new ServerSocket(0); int nonListeningPort = nonListeningSocket.getLocalPort(); selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE); @@ -151,14 +148,15 @@ public class SelectorTest { // create connections InetSocketAddress addr = new InetSocketAddress("localhost", server.port); for (int i = 0; i < conns; i++) - selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE); - + selector.connect(Integer.toString(i), addr, BUFFER_SIZE, BUFFER_SIZE); // send echo requests and receive responses - int[] requests = new int[conns]; - int[] responses = new int[conns]; + Map requests = new HashMap(); + Map responses = new HashMap(); int responseCount = 0; - for (int i = 0; i < conns; i++) - selector.send(createSend(i, i + "-" + 0)); + for (int i = 0; i < conns; i++) { + String node = Integer.toString(i); + selector.send(createSend(node, node + "-0")); + } // loop until we complete all requests while (responseCount < conns * reqs) { @@ -171,19 +169,27 @@ public class SelectorTest { for (NetworkReceive receive : selector.completedReceives()) { String[] pieces = asString(receive).split("-"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); - assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0])); + assertEquals("Check the source", receive.source(), pieces[0]); assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position()); - assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1])); - responses[receive.source()]++; // increment the expected counter + if (responses.containsKey(receive.source())) { + assertEquals("Check the request counter", (int) responses.get(receive.source()), Integer.parseInt(pieces[1])); + responses.put(receive.source(), responses.get(receive.source()) + 1); + } else { + assertEquals("Check the request counter", 0, Integer.parseInt(pieces[1])); + responses.put(receive.source(), 1); + } responseCount++; } // prepare new sends for the next round - for (NetworkSend send : selector.completedSends()) { - int dest = send.destination(); - requests[dest]++; - if (requests[dest] < reqs) - selector.send(createSend(dest, dest + "-" + requests[dest])); + for (Send send : selector.completedSends()) { + String dest = send.destination(); + if (requests.containsKey(dest)) + requests.put(dest, requests.get(dest) + 1); + else + requests.put(dest, 1); + if (requests.get(dest) < reqs) + selector.send(createSend(dest, dest + "-" + requests.get(dest))); } } } @@ -193,7 +199,7 @@ public class SelectorTest { */ @Test public void testSendLargeRequest() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); String big = TestUtils.randomString(10 * BUFFER_SIZE); assertEquals(big, blockingRequest(node, big)); @@ -204,41 +210,41 @@ public class SelectorTest { */ @Test public void testEmptyRequest() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); assertEquals("", blockingRequest(node, "")); } @Test(expected = IllegalStateException.class) public void testExistingConnectionId() throws IOException { - blockingConnect(0); - blockingConnect(0); + blockingConnect("0"); + blockingConnect("0"); } @Test public void testMute() throws Exception { - blockingConnect(0); - blockingConnect(1); + blockingConnect("0"); + blockingConnect("1"); - selector.send(createSend(0, "hello")); - selector.send(createSend(1, "hi")); + selector.send(createSend("0", "hello")); + selector.send(createSend("1", "hi")); - selector.mute(1); + selector.mute("1"); while (selector.completedReceives().isEmpty()) selector.poll(5); assertEquals("We should have only one response", 1, selector.completedReceives().size()); - assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source()); + assertEquals("The response should not be from the muted node", "0", selector.completedReceives().get(0).source()); - selector.unmute(1); + selector.unmute("1"); do { selector.poll(5); } while (selector.completedReceives().isEmpty()); assertEquals("We should have only one response", 1, selector.completedReceives().size()); - assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source()); + assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source()); } - private String blockingRequest(int node, String s) throws IOException { + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); while (true) { @@ -250,13 +256,13 @@ public class SelectorTest { } /* connect and wait for the connection to complete */ - private void blockingConnect(int node) throws IOException { + private void blockingConnect(String node) throws IOException { selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); while (!selector.connected().contains(node)) selector.poll(10000L); } - private NetworkSend createSend(int node, String s) { + private NetworkSend createSend(String node, String s) { return new NetworkSend(node, ByteBuffer.wrap(s.getBytes())); } diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index ea89b06..51eb9d1 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.utils.Time; /** @@ -28,23 +29,23 @@ import org.apache.kafka.common.utils.Time; public class MockSelector implements Selectable { private final Time time; - private final List initiatedSends = new ArrayList(); - private final List completedSends = new ArrayList(); + private final List initiatedSends = new ArrayList(); + private final List completedSends = new ArrayList(); private final List completedReceives = new ArrayList(); - private final List disconnected = new ArrayList(); - private final List connected = new ArrayList(); + private final List disconnected = new ArrayList(); + private final List connected = new ArrayList(); public MockSelector(Time time) { this.time = time; } @Override - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { this.connected.add(id); } @Override - public void disconnect(int id) { + public void disconnect(String id) { this.disconnected.add(id); } @@ -64,7 +65,7 @@ public class MockSelector implements Selectable { } @Override - public void send(NetworkSend send) { + public void send(Send send) { this.initiatedSends.add(send); } @@ -76,7 +77,7 @@ public class MockSelector implements Selectable { } @Override - public List completedSends() { + public List completedSends() { return completedSends; } @@ -94,21 +95,21 @@ public class MockSelector implements Selectable { } @Override - public List disconnected() { + public List disconnected() { return disconnected; } @Override - public List connected() { + public List connected() { return connected; } @Override - public void mute(int id) { + public void mute(String id) { } @Override - public void unmute(int id) { + public void unmute(String id) { } @Override diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 1c3b380..05e7db1 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -174,7 +174,7 @@ object ConsumerGroupCommand { val offsetMap = mutable.Map[TopicAndPartition, Long]() val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala index a3b1b78..258d5fe 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala @@ -18,9 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.network.RequestChannel.Response + import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} +import kafka.network.RequestChannel.Response object ConsumerMetadataRequest { val CurrentVersion = 0.shortValue @@ -64,7 +65,7 @@ case class ConsumerMetadataRequest(group: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { // return ConsumerCoordinatorNotAvailable for all uncaught errors val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } def describe(details: Boolean) = { diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index fe81635..8092007 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -18,10 +18,9 @@ package kafka.api import java.nio.ByteBuffer -import kafka.api.ApiUtils._ -import collection.mutable.ListBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.{TopicAndPartition, ErrorMapping} + +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging @@ -63,7 +62,7 @@ case class ControlledShutdownRequest(versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition]) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean = false): String = { diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b038c15..5b38f85 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -149,7 +149,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 75aaf57..9e54ff0 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -22,8 +22,10 @@ import java.nio.channels.GatheringByteChannel import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} -import kafka.network.{MultiSend, Send} import kafka.api.ApiUtils._ +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.network.Send +import org.apache.kafka.common.network.MultiSend import scala.collection._ @@ -62,7 +64,9 @@ class PartitionDataSend(val partitionId: Int, buffer.putInt(partitionData.messages.sizeInBytes) buffer.rewind() - override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize + + override def destination: String = "" override def writeTo(channel: GatheringByteChannel): Int = { var written = 0 @@ -75,6 +79,8 @@ class PartitionDataSend(val partitionId: Int, } written } + + override def size = buffer.capacity() + messageSize } object TopicData { @@ -101,29 +107,31 @@ case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartiti val headerSize = TopicData.headerSize(topic) } -class TopicDataSend(val topicData: TopicData) extends Send { - private val size = topicData.sizeInBytes +class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { + override val size = topicData.sizeInBytes private var sent = 0 - override def complete = sent >= size + override def completed: Boolean = sent >= size + + override def destination: String = dest private val buffer = ByteBuffer.allocate(topicData.headerSize) writeShortString(buffer, topicData.topic) buffer.putInt(topicData.partitionData.size) buffer.rewind() - val sends = new MultiSend(topicData.partitionData.toList - .map(d => new PartitionDataSend(d._1, d._2))) { - val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize - } + val sends = new MultiSend(dest, + JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2)))) + + override def writeTo(channel: GatheringByteChannel): Int = { + if (completed) + throw new KafkaException("This operation cannot be completed on a complete request.") - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() var written = 0 if(buffer.hasRemaining) written += channel.write(buffer) - if(!buffer.hasRemaining && !sends.complete) { + if(!buffer.hasRemaining && !sends.completed) { written += sends.writeTo(channel) } sent += written @@ -200,14 +208,16 @@ case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchR } -class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { - private val size = fetchResponse.sizeInBytes +class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) extends Send { + override val size = fetchResponse.sizeInBytes private var sent = 0 private val sendSize = 4 /* for size */ + size - override def complete = sent >= sendSize + override def completed = sent >= sendSize + + override def destination = dest private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) buffer.putInt(size) @@ -215,19 +225,19 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count buffer.rewind() - val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map { - case(topic, data) => new TopicDataSend(TopicData(topic, + val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { + case(topic, data) => new TopicDataSend(dest, TopicData(topic, data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)})) - }) { - val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize - } + })) + + override def writeTo(channel: GatheringByteChannel): Int = { + if (completed) + throw new KafkaException("This operation cannot be completed on a complete request.") - def writeTo(channel: GatheringByteChannel):Int = { - expectIncomplete() var written = 0 if(buffer.hasRemaining) written += channel.write(buffer) - if(!buffer.hasRemaining && !sends.complete) { + if(!buffer.hasRemaining && !sends.completed) { written += sends.writeTo(channel) } sent += written diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 431190a..c2584e0 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -19,14 +19,16 @@ package kafka.api import java.nio._ -import kafka.utils._ + import kafka.api.ApiUtils._ import kafka.cluster.BrokerEndPoint -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import collection.Set +import kafka.utils._ + +import scala.collection.Set object LeaderAndIsr { @@ -184,7 +186,7 @@ case class LeaderAndIsrRequest (versionId: Short, case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } val errorResponse = LeaderAndIsrResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 317daed..5b362ef 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -18,11 +18,13 @@ package kafka.api import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import kafka.utils.{SystemTime, Logging} -import kafka.network.{RequestChannel, BoundedByteBufferSend} -import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition} +import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response +import kafka.utils.Logging + import scala.collection._ object OffsetCommitRequest extends Logging { @@ -162,7 +164,7 @@ case class OffsetCommitRequest(groupId: String, val commitStatus = requestInfo.mapValues(_ => errorCode) val commitResponse = OffsetCommitResponse(commitStatus, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index fa8bd6a..a83e147 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -17,16 +17,13 @@ package kafka.api +import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import kafka.utils.Logging -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common._ -import kafka.common.TopicAndPartition +import kafka.common.{TopicAndPartition, _} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response - -import scala.Some - -import java.nio.ByteBuffer +import kafka.utils.Logging object OffsetFetchRequest extends Logging { val CurrentVersion: Short = 1 @@ -99,7 +96,7 @@ case class OffsetFetchRequest(groupId: String, )) }.toMap val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 3d483bc..f418868 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -18,9 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.{ErrorMapping, TopicAndPartition} + import kafka.api.ApiUtils._ -import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response @@ -117,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) } val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 570b2da..c866180 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -18,11 +18,12 @@ package kafka.api import java.nio._ -import kafka.message._ + import kafka.api.ApiUtils._ import kafka.common._ +import kafka.message._ +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import kafka.network.{RequestChannel, BoundedByteBufferSend} object ProducerRequest { val CurrentVersion = 0.shortValue @@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } } diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 5e14987..4441fc6 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio._ import kafka.api.ApiUtils._ -import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException} +import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException} import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.network.RequestChannel.Response import kafka.utils.Logging @@ -106,7 +106,7 @@ case class StopReplicaRequest(versionId: Short, case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) }.toMap val errorResponse = StopReplicaResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 363bae0..401c583 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -18,13 +18,15 @@ package kafka.api import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import collection.mutable.ListBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging +import scala.collection.mutable.ListBuffer + object TopicMetadataRequest extends Logging { val CurrentVersion = 0.shortValue val DefaultClientId = "" @@ -80,7 +82,7 @@ case class TopicMetadataRequest(versionId: Short, topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 69f0397..d59de82 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -21,8 +21,8 @@ import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.cluster.{Broker, BrokerEndPoint} import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import kafka.network.{BoundedByteBufferSend, RequestChannel} import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.Set @@ -128,7 +128,7 @@ case class UpdateMetadataRequest (versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]])) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 62394c0..68c7e7f 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -153,7 +153,7 @@ object ClientUtils extends Logging{ debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) queryChannel.send(ConsumerMetadataRequest(group)) val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.payload()) debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 31a2639..c16f7ed 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -24,6 +24,7 @@ import kafka.api._ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import org.apache.kafka.common.network.{NetworkReceive, Receive} import org.apache.kafka.common.utils.Utils._ /** @@ -65,9 +66,9 @@ class SimpleConsumer(val host: String, } } - private def sendRequest(request: RequestOrResponse): Receive = { + private def sendRequest(request: RequestOrResponse): NetworkReceive = { lock synchronized { - var response: Receive = null + var response: NetworkReceive = null try { getOrMakeConnection() blockingChannel.send(request) @@ -94,12 +95,12 @@ class SimpleConsumer(val host: String, def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = sendRequest(request) - TopicMetadataResponse.readFrom(response.buffer) + TopicMetadataResponse.readFrom(response.payload()) } def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = { val response = sendRequest(request) - ConsumerMetadataResponse.readFrom(response.buffer) + ConsumerMetadataResponse.readFrom(response.payload()) } /** @@ -109,7 +110,7 @@ class SimpleConsumer(val host: String, * @return a set of fetched messages */ def fetch(request: FetchRequest): FetchResponse = { - var response: Receive = null + var response: NetworkReceive = null val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer aggregateTimer.time { @@ -117,7 +118,7 @@ class SimpleConsumer(val host: String, response = sendRequest(request) } } - val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchResponse = FetchResponse.readFrom(response.payload()) val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) @@ -129,7 +130,7 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. */ - def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) + def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload()) /** * Commit offsets for a topic @@ -140,7 +141,7 @@ class SimpleConsumer(val host: String, def commitOffsets(request: OffsetCommitRequest) = { // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before // we can commit offsets. - OffsetCommitResponse.readFrom(sendRequest(request).buffer) + OffsetCommitResponse.readFrom(sendRequest(request).payload()) } /** @@ -149,7 +150,7 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetFetchRequest]] object. * @return a [[kafka.api.OffsetFetchResponse]] object. */ - def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) + def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload()) private def getOrMakeConnection() { if(!isClosed && !blockingChannel.isConnected) { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index aa8d940..305baef 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -334,7 +334,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { kafkaCommitMeter.mark(offsetsToCommit.size) offsetsChannel.send(offsetCommitRequest) - val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) + val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) trace("Offset commit response: %s.".format(offsetCommitResponse)) val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { @@ -421,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, ensureOffsetManagerConnected() try { offsetsChannel.send(offsetFetchRequest) - val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload()) trace("Offset fetch response: %s.".format(offsetFetchResponse)) val (leaderChanged, loadInProgress) = diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6cf13f0..9f521fa 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,8 +16,9 @@ */ package kafka.controller -import kafka.network.{Receive, BlockingChannel} +import kafka.network.BlockingChannel import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.network.NetworkReceive import collection.mutable.HashMap import kafka.cluster.Broker import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} @@ -120,7 +121,7 @@ class RequestSendThread(val controllerId: Int, val queueItem = queue.take() val request = queueItem._1 val callback = queueItem._2 - var receive: Receive = null + var receive: NetworkReceive = null try { lock synchronized { var isSendSuccessful = false @@ -147,11 +148,11 @@ class RequestSendThread(val controllerId: Int, var response: RequestOrResponse = null request.requestId.get match { case RequestKeys.LeaderAndIsrKey => - response = LeaderAndIsrResponse.readFrom(receive.buffer) + response = LeaderAndIsrResponse.readFrom(receive.payload()) case RequestKeys.StopReplicaKey => - response = StopReplicaResponse.readFrom(receive.buffer) + response = StopReplicaResponse.readFrom(receive.payload()) case RequestKeys.UpdateMetadataKey => - response = UpdateMetadataResponse.readFrom(receive.buffer) + response = UpdateMetadataResponse.readFrom(receive.payload()) } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index b0b7be1..568d0ac 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -16,12 +16,11 @@ */ package kafka.javaapi -import kafka.api._ import java.nio.ByteBuffer + +import kafka.api._ + import scala.collection.mutable -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, val correlationId: Int, diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 6e2a38e..8139d3a 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -21,6 +21,7 @@ import java.net.InetSocketAddress import java.nio.channels._ import kafka.utils.{nonthreadsafe, Logging} import kafka.api.RequestOrResponse +import org.apache.kafka.common.network.{NetworkSend, NetworkReceive} object BlockingChannel{ @@ -43,6 +44,7 @@ class BlockingChannel( val host: String, private var writeChannel: GatheringByteChannel = null private val lock = new Object() private val connectTimeoutMs = readTimeoutMs + private var connectionId: String = "" def connect() = lock synchronized { if(!connected) { @@ -59,8 +61,15 @@ class BlockingChannel( val host: String, channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel + // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout + // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true + val localHost = channel.socket.getLocalAddress.getHostAddress + val localPort = channel.socket.getLocalPort + val remoteHost = channel.socket.getInetAddress.getHostAddress + val remotePort = channel.socket.getPort + connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort // settings may not match what we requested above val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d." debug(msg.format(channel.socket.getSoTimeout, @@ -95,20 +104,21 @@ class BlockingChannel( val host: String, def isConnected = connected - def send(request: RequestOrResponse):Int = { + def send(request: RequestOrResponse): Long = { if(!connected) throw new ClosedChannelException() - val send = new BoundedByteBufferSend(request) + val send = new RequestOrResponseSend(connectionId, request) send.writeCompletely(writeChannel) } - def receive(): Receive = { + def receive(): NetworkReceive = { if(!connected) throw new ClosedChannelException() - val response = new BoundedByteBufferReceive() + val response = new NetworkReceive() response.readCompletely(readChannel) + response.payload().rewind() response } diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala deleted file mode 100755 index c0d7726..0000000 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ - -/** - * Represents a communication between the client and server - * - */ -@nonthreadsafe -private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging { - - private val sizeBuffer = ByteBuffer.allocate(4) - private var contentBuffer: ByteBuffer = null - - def this() = this(Int.MaxValue) - - var complete: Boolean = false - - /** - * Get the content buffer for this transmission - */ - def buffer: ByteBuffer = { - expectComplete() - contentBuffer - } - - /** - * Read the bytes in this response from the given channel - */ - def readFrom(channel: ReadableByteChannel): Int = { - expectIncomplete() - var read = 0 - // have we read the request size yet? - if(sizeBuffer.remaining > 0) - read += CoreUtils.read(channel, sizeBuffer) - // have we allocated the request buffer yet? - if(contentBuffer == null && !sizeBuffer.hasRemaining) { - sizeBuffer.rewind() - val size = sizeBuffer.getInt() - if(size <= 0) - throw new InvalidRequestException("%d is not a valid request size.".format(size)) - if(size > maxSize) - throw new InvalidRequestException("Request of length %d is not valid, it is larger than the maximum size of %d bytes.".format(size, maxSize)) - contentBuffer = byteBufferAllocate(size) - } - // if we have a buffer read some stuff into it - if(contentBuffer != null) { - read = CoreUtils.read(channel, contentBuffer) - // did we get everything? - if(!contentBuffer.hasRemaining) { - contentBuffer.rewind() - complete = true - } - } - read - } - - private def byteBufferAllocate(size: Int): ByteBuffer = { - var buffer: ByteBuffer = null - try { - buffer = ByteBuffer.allocate(size) - } catch { - case e: OutOfMemoryError => - error("OOME with size " + size, e) - throw e - case e2: Throwable => - throw e2 - } - buffer - } -} diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala deleted file mode 100644 index b95b73b..0000000 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ -import kafka.api.RequestOrResponse -import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} - -@nonthreadsafe -private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { - - private val sizeBuffer = ByteBuffer.allocate(4) - - // Avoid possibility of overflow for 2GB-4 byte buffer - if(buffer.remaining > Int.MaxValue - sizeBuffer.limit) - throw new IllegalStateException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " + - "allowable size for a bounded buffer is " + (Int.MaxValue - sizeBuffer.limit) + ".") - sizeBuffer.putInt(buffer.limit) - sizeBuffer.rewind() - - var complete: Boolean = false - - def this(size: Int) = this(ByteBuffer.allocate(size)) - - def this(request: RequestOrResponse) = { - this(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) - request.requestId match { - case Some(requestId) => - buffer.putShort(requestId) - case None => - } - - request.writeTo(buffer) - buffer.rewind() - } - - def this(header: ResponseHeader, body: AbstractRequestResponse) = { - this(header.sizeOf + body.sizeOf) - header.writeTo(buffer) - body.writeTo(buffer) - buffer.rewind - } - - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - val written = channel.write(Array(sizeBuffer, buffer)) - // if we are done, mark it off - if(!buffer.hasRemaining) - complete = true - written.asInstanceOf[Int] - } - -} diff --git a/core/src/main/scala/kafka/network/ByteBufferSend.scala b/core/src/main/scala/kafka/network/ByteBufferSend.scala deleted file mode 100644 index af30042..0000000 --- a/core/src/main/scala/kafka/network/ByteBufferSend.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ - -@nonthreadsafe -private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send { - - var complete: Boolean = false - - def this(size: Int) = this(ByteBuffer.allocate(size)) - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - written += channel.write(buffer) - if(!buffer.hasRemaining) - complete = true - written - } - -} diff --git a/core/src/main/scala/kafka/network/Handler.scala b/core/src/main/scala/kafka/network/Handler.scala index a030033..2148e0c 100644 --- a/core/src/main/scala/kafka/network/Handler.scala +++ b/core/src/main/scala/kafka/network/Handler.scala @@ -17,17 +17,19 @@ package kafka.network +import org.apache.kafka.common.network.{Send, NetworkReceive} + private[kafka] object Handler { /** * A request handler is a function that turns an incoming * transmission into an outgoing transmission */ - type Handler = Receive => Option[Send] + type Handler = NetworkReceive => Option[Send] /** * A handler mapping finds the right Handler function for a given request */ - type HandlerMapping = (Short, Receive) => Handler + type HandlerMapping = (Short, NetworkReceive) => Handler } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d0024c..263e23e 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,13 +26,14 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger object RequestChannel extends Logging { - val AllDone = new Request(processor = 1, requestKey = 2, buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) + val AllDone = new Request(processor = 1, connectionId = "2", buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -43,7 +44,7 @@ object RequestChannel extends Logging { byteBuffer } - case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0), securityProtocol: SecurityProtocol) { + case class Request(processor: Int, connectionId: String, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { @volatile var requestDequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L @@ -99,12 +100,23 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } - if(requestLogger.isTraceEnabled) - requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + if(requestLogger.isTraceEnabled) { + val requestDesc = + if (requestObj != null) + requestObj.describe(true) + else + header.toString + " -- " + body.toString + requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + } else if(requestLogger.isDebugEnabled) { - requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + val requestDesc = + if (requestObj != null) + requestObj.describe(false) + else + header.toString + " -- " + body.toString + requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } } } diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala new file mode 100644 index 0000000..364f24b --- /dev/null +++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.nio.ByteBuffer +import java.nio.channels.GatheringByteChannel + +import kafka.api.RequestOrResponse +import kafka.utils.Logging +import org.apache.kafka.common.network.NetworkSend + +object RequestOrResponseSend { + def serialize(request: RequestOrResponse): ByteBuffer = { + val buffer = ByteBuffer.allocate(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) + request.requestId match { + case Some(requestId) => + buffer.putShort(requestId) + case None => + } + request.writeTo(buffer) + buffer.rewind() + buffer + } +} + +class RequestOrResponseSend(val dest: String, val buffer: ByteBuffer) extends NetworkSend(dest, buffer) with Logging { + + def this(dest: String, request: RequestOrResponse) { + this(dest, RequestOrResponseSend.serialize(request)) + } + + def writeCompletely(channel: GatheringByteChannel): Long = { + var totalWritten = 0L + while(!completed()) { + val written = writeTo(channel) + trace(written + " bytes written.") + totalWritten += written + } + totalWritten + } + +} diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index edf6214..43de522 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -21,18 +21,19 @@ import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ import java.net._ -import java.io._ import java.nio.channels._ +import com.yammer.metrics.core.Meter import kafka.cluster.EndPoint +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.utils.{Time, SystemTime} import scala.collection._ import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import com.yammer.metrics.core.{Gauge, Meter} import org.apache.kafka.common.utils.Utils /** @@ -52,7 +53,8 @@ class SocketServer(val brokerId: Int, val connectionsMaxIdleMs: Long, val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " - private val time = SystemTime + private val time = new SystemTime() + private val metrics = new Metrics(time); private val processors = new Array[Processor](numProcessorThreads) private[network] var acceptors = mutable.Map[EndPoint,Acceptor]() val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) @@ -81,22 +83,18 @@ class SocketServer(val brokerId: Int, time, maxRequestSize, aggregateIdleMeter, - newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs, - portToProtocol) + portToProtocol, + metrics) Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start() } } - newGauge("ResponsesBeingSent", new Gauge[Int] { - def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } - }) - // register the processor threads for notification of responses - requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) + requestChannel.addResponseListener((id:Int) => processors(id).wakeup) // start accepting connections // right now we will use the same processors for all ports, since we didn't implement different protocols @@ -140,17 +138,20 @@ class SocketServer(val brokerId: Int, */ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { - protected val selector = Selector.open() private val startupLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1) private val alive = new AtomicBoolean(true) + def wakeup + + def shutdownHook + /** * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete */ def shutdown(): Unit = { alive.set(false) - selector.wakeup() + shutdownHook shutdownLatch.await } @@ -177,11 +178,6 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ protected def isRunning = alive.get /** - * Wakeup the thread for selection. - */ - def wakeup() = selector.wakeup() - - /** * Close the given key and associated socket */ def close(key: SelectionKey) { @@ -200,30 +196,6 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ swallowError(channel.close()) } } - - /** - * Close all open connections - */ - def closeAll() { - // removes cancelled keys from selector.keys set - this.selector.selectNow() - val iter = this.selector.keys().iterator() - while (iter.hasNext) { - val key = iter.next() - close(key) - } - } - - def countInterestOps(ops: Int): Int = { - var count = 0 - val it = this.selector.keys().iterator() - while (it.hasNext) { - if ((it.next().interestOps() & ops) != 0) { - count += 1 - } - } - count - } } /** @@ -237,6 +209,7 @@ private[kafka] class Acceptor(val host: String, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol, portToProtocol: ConcurrentHashMap[Int, SecurityProtocol]) extends AbstractServerThread(connectionQuotas) { + val nioSelector = java.nio.channels.Selector.open() val serverChannel = openServerSocket(host, port) portToProtocol.put(serverChannel.socket().getLocalPort, protocol) @@ -244,13 +217,13 @@ private[kafka] class Acceptor(val host: String, * Accept loop that checks for new connection attempts */ def run() { - serverChannel.register(selector, SelectionKey.OP_ACCEPT) + serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 while(isRunning) { - val ready = selector.select(500) + val ready = nioSelector.select(500) if(ready > 0) { - val keys = selector.selectedKeys() + val keys = nioSelector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null @@ -272,7 +245,7 @@ private[kafka] class Acceptor(val host: String, } debug("Closing server socket and selector.") swallowError(serverChannel.close()) - swallowError(selector.close()) + swallowError(nioSelector.close()) shutdownComplete() } @@ -324,6 +297,14 @@ private[kafka] class Acceptor(val host: String, } } + /** + * Wakeup the thread for selection. + */ + @Override + def wakeup = nioSelector.wakeup + + @Override + def shutdownHook = wakeup } /** @@ -334,18 +315,27 @@ private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, val aggregateIdleMeter: Meter, - val idleMeter: Meter, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, val connectionsMaxIdleMs: Long, - val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol]) extends AbstractServerThread(connectionQuotas) { + val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol], + val metrics: Metrics) extends AbstractServerThread(connectionQuotas) { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() - private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000 - private var currentTimeNanos = SystemTime.nanoseconds - private val lruConnections = new util.LinkedHashMap[SelectionKey, Long] - private var nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos + private val inflightResponses = mutable.Map[String, RequestChannel.Response]() + + private var metricTags = new util.HashMap[String, String]() + metricTags.put("processor-id", id.toString) + private val selector = new org.apache.kafka.common.network.Selector( + maxRequestSize, + connectionsMaxIdleMs, + metrics, + time, + "SocketServer", + metricTags, + false + ) override def run() { startupComplete() @@ -354,68 +344,50 @@ private[kafka] class Processor(val id: Int, configureNewConnections() // register any new responses for writing processNewResponses() - val startSelectTime = SystemTime.nanoseconds - val ready = selector.select(300) - currentTimeNanos = SystemTime.nanoseconds - val idleTime = currentTimeNanos - startSelectTime - idleMeter.mark(idleTime) - // We use a single meter for aggregate idle percentage for the thread pool. - // Since meter is calculated as total_recorded_value / time_window and - // time_window is independent of the number of threads, each recorded idle - // time should be discounted by # threads. - aggregateIdleMeter.mark(idleTime / totalProcessorThreads) - - trace("Processor id " + id + " selection time = " + idleTime + " ns") - if(ready > 0) { - val keys = selector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isReadable) - read(key) - else if(key.isWritable) - write(key) - else if(!key.isValid) - close(key) - else - throw new IllegalStateException("Unrecognized key state for processor thread.") - } catch { - case e: EOFException => { - debug("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress)) - close(key) - } case e: InvalidRequestException => { - info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) - close(key) - } case e: Throwable => { - error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) - close(key) - } - } + + try { + selector.poll(300) + } catch { + case e: IllegalStateException => { + error("Closing processor %s due to illegal state".format(id)) + swallow(closeAll()) + shutdownComplete() } } - maybeCloseOldestConnection + collection.JavaConversions.iterableAsScalaIterable(selector.completedReceives).foreach( receive => { + val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) + + try { + requestChannel.sendRequest(req) + } catch { + case e: InvalidRequestException => { + info("Closing socket connection to %s due to invalid request: %s".format(receive.source, e.getMessage)) + selector.close(receive.source) + } case e: Throwable => { + error("Closing socket for " + receive.source + " because of error", e) + selector.close(receive.source) + } + } + selector.mute(receive.source) + }) + + collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach( send => { + val resp = inflightResponses.remove(send.destination()).get + resp.request.updateRequestMetrics() + selector.unmute(send.destination()) + }) } - debug("Closing selector.") + + + + debug("Closing selector - processor " + id) closeAll() - swallowError(selector.close()) shutdownComplete() } - /** - * Close the given key and associated socket - */ - override def close(key: SelectionKey): Unit = { - lruConnections.remove(key) - super.close(key) - } - private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { - val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { curr.responseAction match { case RequestChannel.NoOpAction => { @@ -423,26 +395,21 @@ private[kafka] class Processor(val id: Int, // that are sitting in the server's socket buffer curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) - key.interestOps(SelectionKey.OP_READ) - key.attach(null) + selector.unmute(curr.request.connectionId) } case RequestChannel.SendAction => { - trace("Socket server received response to send, registering for write: " + curr) - key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + trace("Socket server received response to send, registering for write, sending data and registering for read: " + curr) + selector.send(curr.responseSend) + inflightResponses += (curr.request.connectionId -> curr) } case RequestChannel.CloseConnectionAction => { curr.request.updateRequestMetrics trace("Closing socket connection actively according to the response code.") - close(key) + selector.close(curr.request.connectionId) } - case responseCode => throw new KafkaException("No mapping found for response code " + responseCode) - } - } catch { - case e: CancelledKeyException => { - debug("Ignoring response for closed socket.") - close(key) } + + } finally { curr = requestChannel.receiveResponse(id) } @@ -454,7 +421,7 @@ private[kafka] class Processor(val id: Int, */ def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) - wakeup() + wakeup } /** @@ -464,84 +431,30 @@ private[kafka] class Processor(val id: Int, while(!newConnections.isEmpty) { val channel = newConnections.poll() debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) - channel.register(selector, SelectionKey.OP_READ) + val localHost = channel.socket().getLocalAddress.getHostAddress + val localPort = channel.socket().getLocalPort + val remoteHost = channel.socket().getInetAddress.getHostAddress + val remotePort = channel.socket().getPort + val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + selector.register(connectionId, channel) } } - /* - * Process reads from ready sockets + /** + * Close all open connections */ - def read(key: SelectionKey) { - lruConnections.put(key, currentTimeNanos) - val socketChannel = channelFor(key) - var receive = key.attachment.asInstanceOf[Receive] - if(key.attachment == null) { - receive = new BoundedByteBufferReceive(maxRequestSize) - key.attach(receive) - } - val read = receive.readFrom(socketChannel) - val address = socketChannel.socket.getRemoteSocketAddress() - trace(read + " bytes read from " + address) - if(read < 0) { - close(key) - } else if(receive.complete) { - val port = socketChannel.socket().getLocalPort - val protocol = portToProtocol.get(port) - val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol) - requestChannel.sendRequest(req) - key.attach(null) - // explicitly reset interest ops to not READ, no need to wake up the selector just yet - key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) - } else { - // more reading to be done - trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) - key.interestOps(SelectionKey.OP_READ) - wakeup() - } + def closeAll() { + selector.close() } - /* - * Process writes to ready sockets + /** + * Wakeup the thread for selection. */ - def write(key: SelectionKey) { - val socketChannel = channelFor(key) - val response = key.attachment().asInstanceOf[RequestChannel.Response] - val responseSend = response.responseSend - if(responseSend == null) - throw new IllegalStateException("Registered for write interest but no response attached to key.") - val written = responseSend.writeTo(socketChannel) - trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) - if(responseSend.complete) { - response.request.updateRequestMetrics() - key.attach(null) - trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) - key.interestOps(SelectionKey.OP_READ) - } else { - trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) - key.interestOps(SelectionKey.OP_WRITE) - wakeup() - } - } + @Override + def wakeup = selector.wakeup - private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel] - - private def maybeCloseOldestConnection { - if(currentTimeNanos > nextIdleCloseCheckTime) { - if(lruConnections.isEmpty) { - nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos - } else { - val oldestConnectionEntry = lruConnections.entrySet.iterator().next() - val connectionLastActiveTime = oldestConnectionEntry.getValue - nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos - if(currentTimeNanos > nextIdleCloseCheckTime) { - val key: SelectionKey = oldestConnectionEntry.getKey - trace("About to close the idle connection from " + key.channel.asInstanceOf[SocketChannel].socket.getRemoteSocketAddress - + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis") - close(key) - } - } - } - } + @Override + def shutdownHook = wakeup } diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala deleted file mode 100644 index 2827103..0000000 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils.Logging -import kafka.common.KafkaException - -/** - * Represents a stateful transfer of data to or from the network - */ -private[network] trait Transmission extends Logging { - - def complete: Boolean - - protected def expectIncomplete(): Unit = { - if(complete) - throw new KafkaException("This operation cannot be completed on a complete request.") - } - - protected def expectComplete(): Unit = { - if(!complete) - throw new KafkaException("This operation cannot be completed on an incomplete request.") - } - -} - -/** - * A transmission that is being received from a channel - */ -trait Receive extends Transmission { - - def buffer: ByteBuffer - - def readFrom(channel: ReadableByteChannel): Int - - def readCompletely(channel: ReadableByteChannel): Int = { - var totalRead = 0 - while(!complete) { - val read = readFrom(channel) - trace(read + " bytes read.") - totalRead += read - } - totalRead - } - -} - -/** - * A transmission that is being sent out to the channel - */ -trait Send extends Transmission { - - def writeTo(channel: GatheringByteChannel): Int - - def writeCompletely(channel: GatheringByteChannel): Int = { - var totalWritten = 0 - while(!complete) { - val written = writeTo(channel) - trace(written + " bytes written.") - totalWritten += written - } - totalWritten - } - -} - -/** - * A set of composite sends, sent one after another - */ -abstract class MultiSend[S <: Send](val sends: List[S]) extends Send { - val expectedBytesToWrite: Int - private var current = sends - var totalWritten = 0 - - /** - * This method continues to write to the socket buffer till an incomplete - * write happens. On an incomplete write, it returns to the caller to give it - * a chance to schedule other work till the buffered write completes. - */ - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete - var totalWrittenPerCall = 0 - var sendComplete: Boolean = false - do { - val written = current.head.writeTo(channel) - totalWritten += written - totalWrittenPerCall += written - sendComplete = current.head.complete - if(sendComplete) - current = current.tail - } while (!complete && sendComplete) - trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + expectedBytesToWrite) - totalWrittenPerCall - } - - def complete: Boolean = { - if (current == Nil) { - if (totalWritten != expectedBytesToWrite) - error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) - true - } else { - false - } - } -} diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0f09951..dcee501 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,11 +17,12 @@ package kafka.producer -import kafka.api._ -import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} -import kafka.utils._ import java.util.Random +import kafka.api._ +import kafka.network.{RequestOrResponseSend, BlockingChannel} +import kafka.utils._ +import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.utils.Utils._ object SyncProducer { @@ -50,7 +51,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level */ if (logger.isDebugEnabled) { - val buffer = new BoundedByteBufferSend(request).buffer + val buffer = new RequestOrResponseSend("", request).buffer trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() if(requestTypeId == RequestKeys.ProduceKey) { @@ -63,12 +64,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = { + private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = { lock synchronized { verifyRequest(request) getOrMakeConnection() - var response: Receive = null + var response: NetworkReceive = null try { blockingChannel.send(request) if(readResponse) @@ -95,7 +96,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) - var response: Receive = null + var response: NetworkReceive = null val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { @@ -104,14 +105,14 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } if(producerRequest.requiredAcks != 0) - ProducerResponse.readFrom(response.buffer) + ProducerResponse.readFrom(response.payload) else null } def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = doSend(request) - TopicMetadataResponse.readFrom(response.buffer) + TopicMetadataResponse.readFrom(response.payload) } def close() = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 417960d..ab56cd9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,7 +18,6 @@ package kafka.server import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ import kafka.admin.AdminUtils @@ -28,10 +27,9 @@ import kafka.coordinator.ConsumerCoordinator import kafka.log._ import kafka.network._ import kafka.network.RequestChannel.Response +import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend} import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging} - import scala.collection._ - import org.I0Itec.zkclient.ZkClient /** @@ -54,7 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handle(request: RequestChannel.Request) { try{ - trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) + trace("Handling request: " + request.requestObj + " from connection: " + request.connectionId) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) @@ -84,7 +82,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (response == null) requestChannel.closeConnection(request.processor, request) else - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) } error("error when handling request %s".format(request.requestObj), e) } finally @@ -99,7 +97,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse))) } catch { case e: KafkaStorageException => fatal("Disk error during leadership change.", e) @@ -114,7 +112,7 @@ class KafkaApis(val requestChannel: RequestChannel, val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, stopReplicaResponse))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() } @@ -123,7 +121,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse))) } def handleControlledShutdownRequest(request: RequestChannel.Request) { @@ -134,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.NoError, partitionsRemaining) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse))) } @@ -158,7 +156,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } if (offsetCommitRequest.versionId == 0) { @@ -260,7 +258,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } } @@ -305,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } // call the replica manager to fetch messages from the local replica @@ -363,7 +361,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = OffsetResponse(offsetRequest.correlationId, responseMap) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { @@ -466,7 +464,7 @@ class KafkaApis(val requestChannel: RequestChannel, val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } /* @@ -514,8 +512,10 @@ class KafkaApis(val requestChannel: RequestChannel, } trace("Sending offset fetch response %s for correlation id %d to client %s." - .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) + + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) + } /* @@ -540,7 +540,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending consumer metadata %s for correlation id %d to client %s." .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } def handleJoinGroupRequest(request: RequestChannel.Request) { @@ -553,7 +553,7 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody))) } // let the coordinator to handle join-group @@ -573,7 +573,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { val response = new HeartbeatResponse(errorCode) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response))) } // let the coordinator to handle heartbeat diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b7d2a28..6ee80fd 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -26,13 +26,15 @@ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import org.apache.kafka.common.network.NetworkReceive + import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} -import kafka.network.{Receive, BlockingChannel, SocketServer} +import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import kafka.coordinator.ConsumerCoordinator @@ -262,14 +264,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // 2. issue a controlled shutdown to the controller if (channel != null) { - var response: Receive = null + var response: NetworkReceive = null try { // send the controlled shutdown request val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) channel.send(request) response = channel.receive() - val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) + val shutdownResponse = ControlledShutdownResponse.readFrom(response.payload()) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) { shutdownSucceeded = true diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala b/core/src/main/scala/kafka/server/MessageSetSend.scala deleted file mode 100644 index 5667648..0000000 --- a/core/src/main/scala/kafka/server/MessageSetSend.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.nio._ -import java.nio.channels._ -import kafka.network._ -import kafka.message._ -import kafka.utils._ -import kafka.common.ErrorMapping - -/** - * A zero-copy message response that writes the bytes needed directly from the file - * wholly in kernel space - */ -@nonthreadsafe -private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send { - - private var sent: Int = 0 - private val size: Int = messages.sizeInBytes - private val header = ByteBuffer.allocate(6) - header.putInt(size + 2) - header.putShort(errorCode) - header.rewind() - - var complete: Boolean = false - - def this(messages: MessageSet) = this(messages, ErrorMapping.NoError) - - def this() = this(MessageSet.Empty) - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - if(header.hasRemaining) - written += channel.write(header) - if(!header.hasRemaining) { - val fileBytesSent = messages.writeTo(channel, sent, size - sent) - written += fileBytesSent - sent += fileBytesSent - } - - if(logger.isTraceEnabled) - if (channel.isInstanceOf[SocketChannel]) { - val socketChannel = channel.asInstanceOf[SocketChannel] - logger.trace(sent + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " expecting to send " + size + " bytes") - } - - if(sent >= size) - complete = true - written - } - - def sendSize: Int = size + header.capacity - -} diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d2bac85..0740b3a 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -162,7 +162,7 @@ object ConsumerOffsetChecker extends Logging { debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) debug("Received offset fetch response %s.".format(offsetFetchResponse)) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 1b7d5d8..9973dad 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=ERROR +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=TRACE # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 9881bd3..b708cd6 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -72,7 +72,7 @@ object TestOffsetManager { offsetsChannel.send(commitRequest) numCommits.getAndIncrement commitTimer.time { - val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) + val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) numErrors.getAndIncrement } offset += 1 @@ -119,7 +119,7 @@ object TestOffsetManager { val group = "group-" + id try { metadataChannel.send(ConsumerMetadataRequest(group)) - val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinatorOpt.map(_.id).getOrElse(-1) + val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) @@ -135,7 +135,7 @@ object TestOffsetManager { channel.send(fetchRequest) fetchTimer.time { - val response = OffsetFetchResponse.readFrom(channel.receive().buffer) + val response = OffsetFetchResponse.readFrom(channel.receive().payload()) if (response.requestInfo.exists(_._2.error != ErrorMapping.NoError)) { numErrors.getAndIncrement } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 95d5621..8047f52 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -20,6 +20,7 @@ package kafka.network; import java.net._ import java.io._ import kafka.cluster.EndPoint +import org.apache.kafka.common.network.NetworkSend import org.apache.kafka.common.protocol.SecurityProtocol import org.junit._ import org.scalatest.junit.JUnitSuite @@ -71,7 +72,7 @@ class SocketServerTest extends JUnitSuite { val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) request.requestObj.writeTo(byteBuffer) byteBuffer.rewind() - val send = new BoundedByteBufferSend(byteBuffer) + val send = new NetworkSend(request.connectionId, byteBuffer) channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } @@ -122,26 +123,6 @@ class SocketServerTest extends JUnitSuite { } @Test - def testNullResponse() { - val socket = connect() - val bytes = new Array[Byte](40) - sendRequest(socket, 0, bytes) - - val request = server.requestChannel.receiveRequest - // Since the response is not sent yet, the selection key should not be readable. - TestUtils.waitUntilTrue( - () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) != SelectionKey.OP_READ }, - "Socket key shouldn't be available for read") - - server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) - - // After the response is sent to the client (which is async and may take a bit of time), the socket key should be available for reads. - TestUtils.waitUntilTrue( - () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }, - "Socket key should be available for reads") - } - - @Test def testSocketsCloseOnShutdown() { // open a connection val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)