From 14d0d19a043515e57f39298debe11e43ac22da8c Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 22 May 2015 11:21:09 -0700 Subject: [PATCH] KAFKA-2217; Refactor Client Selectable Interface for Better Concurrency Options --- .../org/apache/kafka/clients/NetworkClient.java | 34 +++++---- .../apache/kafka/common/network/PollResult.java | 68 ++++++++++++++++++ .../apache/kafka/common/network/Selectable.java | 26 +------ .../org/apache/kafka/common/network/Selector.java | 84 +++++++--------------- .../apache/kafka/common/network/SelectorTest.java | 59 ++++++++------- .../java/org/apache/kafka/test/MockSelector.java | 16 +---- 6 files changed, 151 insertions(+), 136 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/network/PollResult.java diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 435fbb5..6ac846d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.PollResult; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; @@ -217,19 +218,22 @@ public class NetworkClient implements KafkaClient { if (metadataTimeout == 0) maybeUpdateMetadata(now); // do the I/O + PollResult pollResult = null; + try { - this.selector.poll(Math.min(timeout, metadataTimeout)); + pollResult = this.selector.poll(Math.min(timeout, metadataTimeout)); } catch (IOException e) { log.error("Unexpected error during I/O in producer network thread", e); } // process completed actions List responses = new ArrayList(); - handleCompletedSends(responses, now); - handleCompletedReceives(responses, now); - handleDisconnections(responses, now); - handleConnections(); - + if (pollResult != null) { + handleCompletedSends(responses, pollResult, now); + handleCompletedReceives(responses, pollResult, now); + handleDisconnections(responses, pollResult, now); + handleConnections(pollResult); + } // invoke callbacks for (ClientResponse response : responses) { if (response.request().hasCallback()) { @@ -353,9 +357,9 @@ public class NetworkClient implements KafkaClient { * @param responses The list of responses to update * @param now The current time */ - private void handleCompletedSends(List responses, long now) { + private void handleCompletedSends(List responses, PollResult pollResult, long now) { // if no response is expected then when the send is completed, return it - for (NetworkSend send : this.selector.completedSends()) { + for (NetworkSend send : pollResult.getCompletedSends()) { ClientRequest request = this.inFlightRequests.lastSent(send.destination()); if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); @@ -370,8 +374,8 @@ public class NetworkClient implements KafkaClient { * @param responses The list of responses to update * @param now The current time */ - private void handleCompletedReceives(List responses, long now) { - for (NetworkReceive receive : this.selector.completedReceives()) { + private void handleCompletedReceives(List responses, PollResult pollResult, long now) { + for (NetworkReceive receive : pollResult.getCompletedReceives()) { int source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); @@ -411,8 +415,8 @@ public class NetworkClient implements KafkaClient { * @param responses The list of responses that completed with the disconnection * @param now The current time */ - private void handleDisconnections(List responses, long now) { - for (int node : this.selector.disconnected()) { + private void handleDisconnections(List responses, PollResult pollResult, long now) { + for (int node : pollResult.getDisconnected()) { connectionStates.disconnected(node); log.debug("Node {} disconnected.", node); for (ClientRequest request : this.inFlightRequests.clearAll(node)) { @@ -425,15 +429,15 @@ public class NetworkClient implements KafkaClient { } } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead - if (this.selector.disconnected().size() > 0) + if (pollResult.getDisconnected().size() > 0) this.metadata.requestUpdate(); } /** * Record any newly completed connections */ - private void handleConnections() { - for (Integer id : this.selector.connected()) { + private void handleConnections(PollResult pollResult) { + for (Integer id : pollResult.getConnected()) { log.debug("Completed connection to node {}", id); this.connectionStates.connected(id); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/PollResult.java b/clients/src/main/java/org/apache/kafka/common/network/PollResult.java new file mode 100644 index 0000000..5413fcd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/PollResult.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.util.List; + +/** + * Container for {@link Selectable} to report poll results from {@link Selectable#poll(long)} + */ +public class PollResult { + + private final List completedSends; + private final List completedReceives; + private final List disconnected; + private final List connected; + + public PollResult( + List completedSends, + List completedReceives, + List disconnected, + List connected) { + this.completedSends = completedSends; + this.completedReceives = completedReceives; + this.disconnected = disconnected; + this.connected = connected; + } + + /** + * The list of sends that completed on the last {@link Selectable#poll(long) poll()} call. + */ + public List getCompletedSends() { + return completedSends; + } + + /** + * The list of receives that completed on the last {@link Selectable#poll(long) poll()} call. + */ + public List getCompletedReceives() { + return completedReceives; + } + + /** + * The list of connections that finished disconnecting on the last {@link Selectable#poll(long) poll()} + * call. + */ + public List getDisconnected() { + return disconnected; + } + + /** + * The list of connections that completed their connection on the last {@link Selectable#poll(long) poll()} + * call. + */ + public List getConnected() { + return connected; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b5f8d83..afca1e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -14,7 +14,6 @@ package org.apache.kafka.common.network; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.List; /** * An interface for asynchronous, multi-channel network I/O @@ -56,30 +55,9 @@ public interface Selectable { * Do I/O. Reads, writes, connection establishment, etc. * @param timeout The amount of time to block if there is nothing to do * @throws IOException + * @return the results of the poll (completed sends/receives, connects and disconnects) */ - public void poll(long timeout) throws IOException; - - /** - * The list of sends that completed on the last {@link #poll(long, List) poll()} call. - */ - public List completedSends(); - - /** - * The list of receives that completed on the last {@link #poll(long, List) poll()} call. - */ - public List completedReceives(); - - /** - * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()} - * call. - */ - public List disconnected(); - - /** - * The list of connections that completed their connection on the last {@link #poll(long, List) poll()} - * call. - */ - public List connected(); + public PollResult poll(long timeout) throws IOException; /** * Disable reads from the given connection diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 57de058..112c1e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -21,13 +21,7 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.KafkaException; @@ -78,15 +72,11 @@ public class Selector implements Selectable { private final java.nio.channels.Selector selector; private final Map keys; - private final List completedSends; - private final List completedReceives; - private final List disconnected; - private final List connected; - private final List failedSends; private final Time time; private final SelectorMetrics sensors; private final String metricGrpPrefix; private final Map metricTags; + private final List failedSends; /** * Create a new selector @@ -101,10 +91,6 @@ public class Selector implements Selectable { this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; this.keys = new HashMap(); - this.completedSends = new ArrayList(); - this.completedReceives = new ArrayList(); - this.connected = new ArrayList(); - this.disconnected = new ArrayList(); this.failedSends = new ArrayList(); this.sensors = new SelectorMetrics(metrics); } @@ -113,7 +99,7 @@ public class Selector implements Selectable { * Begin connecting to the given address and add the connection to this selector associated with the given id * number. *

- * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)} + * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long)} * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call. * @param id The id for the new connection * @param address The address to connect to @@ -150,7 +136,7 @@ public class Selector implements Selectable { /** * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be - * processed until the next {@link #poll(long, List) poll()} call. + * processed until the next {@link #poll(long) poll()} call. */ @Override public void disconnect(int id) { @@ -203,25 +189,28 @@ public class Selector implements Selectable { * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives. * - * When this call is completed the user can check for completed sends, receives, connections or disconnects using - * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These - * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any - * completed I/O. + * When this call is completed the user can check for completed sends, receives, connections or disconnects from + * the returned {@link PollResult}. * * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely. * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send */ @Override - public void poll(long timeout) throws IOException { - clear(); - + public PollResult poll(long timeout) throws IOException { /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); + List completedSends = new ArrayList(); + List completedReceives = new ArrayList(); + List connected = new ArrayList(); + List disconnected = new ArrayList(); + + drainFailedSends(disconnected); + if (readyKeys > 0) { Set keys = this.selector.selectedKeys(); Iterator iter = keys.iterator(); @@ -240,7 +229,7 @@ public class Selector implements Selectable { if (key.isConnectable()) { channel.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); - this.connected.add(transmissions.id); + connected.add(transmissions.id); this.sensors.connectionCreated.record(); } @@ -251,7 +240,7 @@ public class Selector implements Selectable { transmissions.receive.readFrom(channel); if (transmissions.receive.complete()) { transmissions.receive.payload().rewind(); - this.completedReceives.add(transmissions.receive); + completedReceives.add(transmissions.receive); this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit()); transmissions.clearReceive(); } @@ -261,7 +250,7 @@ public class Selector implements Selectable { if (key.isWritable()) { transmissions.send.writeTo(channel); if (transmissions.send.remaining() <= 0) { - this.completedSends.add(transmissions.send); + completedSends.add(transmissions.send); this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); transmissions.clearSend(); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); @@ -271,7 +260,7 @@ public class Selector implements Selectable { /* cancel any defunct sockets */ if (!key.isValid()) { close(key); - this.disconnected.add(transmissions.id); + disconnected.add(transmissions.id); } } catch (IOException e) { String desc = socketDescription(channel); @@ -280,12 +269,14 @@ public class Selector implements Selectable { else log.warn("Error in I/O with connection to {}", desc, e); close(key); - this.disconnected.add(transmissions.id); + disconnected.add(transmissions.id); } } } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); + + return new PollResult(completedSends, completedReceives, disconnected, connected); } private String socketDescription(SocketChannel channel) { @@ -299,26 +290,6 @@ public class Selector implements Selectable { } @Override - public List completedSends() { - return this.completedSends; - } - - @Override - public List completedReceives() { - return this.completedReceives; - } - - @Override - public List disconnected() { - return this.disconnected; - } - - @Override - public List connected() { - return this.connected; - } - - @Override public void mute(int id) { mute(this.keyForId(id)); } @@ -348,16 +319,9 @@ public class Selector implements Selectable { unmute(key); } - /** - * Clear the results from the prior poll - */ - private void clear() { - this.completedSends.clear(); - this.completedReceives.clear(); - this.connected.clear(); - this.disconnected.clear(); - this.disconnected.addAll(this.failedSends); - this.failedSends.clear(); + private void drainFailedSends(Collection destination) { + destination.addAll(failedSends); + failedSends.clear(); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d5b306b..2aba9b4 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -71,8 +71,11 @@ public class SelectorTest { // disconnect this.server.closeConnections(); - while (!selector.disconnected().contains(node)) - selector.poll(1000L); + + PollResult result = null; + do { + result = selector.poll(1000L); + } while (!result.getDisconnected().contains(node)); // reconnect and do another request blockingConnect(node); @@ -88,10 +91,10 @@ public class SelectorTest { blockingConnect(node); selector.disconnect(node); selector.send(createSend(node, "hello1")); - selector.poll(10); - assertEquals("Request should not have succeeded", 0, selector.completedSends().size()); - assertEquals("There should be a disconnect", 1, selector.disconnected().size()); - assertTrue("The disconnect should be from our node", selector.disconnected().contains(node)); + PollResult result = selector.poll(10); + assertEquals("Request should not have succeeded", 0, result.getCompletedSends().size()); + assertEquals("There should be a disconnect", 1, result.getDisconnected().size()); + assertTrue("The disconnect should be from our node", result.getDisconnected().contains(node)); blockingConnect(node); assertEquals("hello2", blockingRequest(node, "hello2")); } @@ -134,8 +137,10 @@ public class SelectorTest { ServerSocket nonListeningSocket = new ServerSocket(0); int nonListeningPort = nonListeningSocket.getLocalPort(); selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE); - while (selector.disconnected().contains(node)) - selector.poll(1000L); + PollResult result = null; + do { + result = selector.poll(1000L); + } while (result.getDisconnected().contains(node)); nonListeningSocket.close(); } @@ -163,12 +168,12 @@ public class SelectorTest { // loop until we complete all requests while (responseCount < conns * reqs) { // do the i/o - selector.poll(0L); + PollResult result = selector.poll(0L); - assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); + assertEquals("No disconnects should have occurred.", 0, result.getDisconnected().size()); // handle any responses we may have gotten - for (NetworkReceive receive : selector.completedReceives()) { + for (NetworkReceive receive : result.getCompletedReceives()) { String[] pieces = asString(receive).split("-"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0])); @@ -179,7 +184,7 @@ public class SelectorTest { } // prepare new sends for the next round - for (NetworkSend send : selector.completedSends()) { + for (NetworkSend send : result.getCompletedSends()) { int dest = send.destination(); requests[dest]++; if (requests[dest] < reqs) @@ -225,25 +230,29 @@ public class SelectorTest { selector.mute(1); - while (selector.completedReceives().isEmpty()) - selector.poll(5); - assertEquals("We should have only one response", 1, selector.completedReceives().size()); - assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source()); + PollResult result = null; + do { + result = selector.poll(5); + } while (result.getCompletedReceives().isEmpty()); + + assertEquals("We should have only one response", 1, result.getCompletedReceives().size()); + assertEquals("The response should not be from the muted node", 0, result.getCompletedReceives().get(0).source()); selector.unmute(1); + result = null; do { - selector.poll(5); - } while (selector.completedReceives().isEmpty()); - assertEquals("We should have only one response", 1, selector.completedReceives().size()); - assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source()); + result = selector.poll(5); + } while (result.getCompletedReceives().isEmpty()); + assertEquals("We should have only one response", 1, result.getCompletedReceives().size()); + assertEquals("The response should be from the previously muted node", 1, result.getCompletedReceives().get(0).source()); } private String blockingRequest(int node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); while (true) { - selector.poll(1000L); - for (NetworkReceive receive : selector.completedReceives()) + PollResult result = selector.poll(1000L); + for (NetworkReceive receive : result.getCompletedReceives()) if (receive.source() == node) return asString(receive); } @@ -252,8 +261,10 @@ public class SelectorTest { /* connect and wait for the connection to complete */ private void blockingConnect(int node) throws IOException { selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); - while (!selector.connected().contains(node)) - selector.poll(10000L); + PollResult result = null; + do { + result = selector.poll(10000L); + } while (!result.getConnected().contains(node)); } private NetworkSend createSend(int node, String s) { diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index ea89b06..5f0edcf 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -19,6 +19,7 @@ import java.util.List; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.PollResult; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.utils.Time; @@ -69,36 +70,25 @@ public class MockSelector implements Selectable { } @Override - public void poll(long timeout) throws IOException { + public PollResult poll(long timeout) throws IOException { this.completedSends.addAll(this.initiatedSends); this.initiatedSends.clear(); time.sleep(timeout); - } - - @Override - public List completedSends() { - return completedSends; + return new PollResult(completedSends, completedReceives, disconnected, connected); } public void completeSend(NetworkSend send) { this.completedSends.add(send); } - @Override - public List completedReceives() { - return completedReceives; - } - public void completeReceive(NetworkReceive receive) { this.completedReceives.add(receive); } - @Override public List disconnected() { return disconnected; } - @Override public List connected() { return connected; } -- 2.3.2 (Apple Git-55)