From 4ed04109ef7400de2d566b0e0c850e2f6ff07d68 Mon Sep 17 00:00:00 2001 From: bmistr1 Date: Sun, 23 Nov 2014 04:07:55 -0800 Subject: [PATCH] Initial CPU Hish Usage by Kafka FIX and Also fix CLOSE() method never exits when brokers cooneciton is LOST... --- .../kafka/clients/ClusterConnectionStates.java | 72 ++++++++++++++++++++-- .../java/org/apache/kafka/clients/KafkaClient.java | 9 ++- .../org/apache/kafka/clients/NetworkClient.java | 10 ++- .../apache/kafka/clients/NodeConnectionState.java | 17 ++++- .../kafka/clients/producer/KafkaProducer.java | 19 +++++- .../kafka/clients/producer/ProducerConfig.java | 6 +- .../kafka/clients/producer/internals/Sender.java | 59 ++++++++++++++++-- .../main/java/org/apache/kafka/common/Cluster.java | 7 +++ .../java/org/apache/kafka/clients/MockClient.java | 5 ++ 9 files changed, 185 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 8aece7e..9747617 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -13,6 +13,7 @@ package org.apache.kafka.clients; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; /** @@ -27,6 +28,9 @@ final class ClusterConnectionStates { this.reconnectBackoffMs = reconnectBackoffMs; this.nodeState = new HashMap(); } + + + /** * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not @@ -37,10 +41,19 @@ final class ClusterConnectionStates { */ public boolean canConnect(int node, long now) { NodeConnectionState state = nodeState.get(node); - if (state == null) + // new connection... + if (state == null){ return true; - else - return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; + } + else { + ///System.out.println(Thread.currentThread().getName() + " backoff reconnecting...."); + if(state.state == ConnectionState.DISCONNECTED){ + return now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; + //TO THIS IS MY return now >= (state.lastConnectAttemptMs + getWaitTimeExp(state.currentRetryCount)); + }else{ + return false; + } + } } /** @@ -53,6 +66,7 @@ final class ClusterConnectionStates { if (state == null) return false; else + // TODO What should we do here >>>>>???? return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs < this.reconnectBackoffMs; } @@ -64,10 +78,16 @@ final class ClusterConnectionStates { * @param now The current time in ms */ public long connectionDelay(int node, long now) { + ///System.out.println(Thread.currentThread().getName() + " connectionDelay...."); + NodeConnectionState state = nodeState.get(node); - if (state == null) return 0; + // no state return it... + if (state == null) { + return 0; + } long timeWaited = now - state.lastConnectAttemptMs; if (state.state == ConnectionState.DISCONNECTED) { + // TODO WE NEED TO SOLVE THIS AS WELLL WITH BACKOFF I guess...? return Math.max(this.reconnectBackoffMs - timeWaited, 0); } else { @@ -83,7 +103,13 @@ final class ClusterConnectionStates { * @param now The current time. */ public void connecting(int node, long now) { - nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); + NodeConnectionState nodeConn = nodeState.get(node); + if(nodeConn == null){ + nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); + }else{ + nodeConn.state = ConnectionState.CONNECTING; + + } } /** @@ -117,18 +143,54 @@ final class ClusterConnectionStates { * @param node The node we have disconnected from */ public void disconnected(int node) { + //System.out.println(Thread.currentThread().getName() + " disconnected....node-id="+node); nodeState(node).state = ConnectionState.DISCONNECTED; } /** + * Enter the disconnected state for the given node + * @param node The node we have disconnected from + */ + public void disconnectedWhenConnectting(int node) { + System.out.println(Thread.currentThread().getName() + " New Code testing disconnectedWhenConnectting....node-id="+node); + nodeState(node).state = ConnectionState.DISCONNECTED; + nodeState(node).incrementRetry(); + } + + public boolean isAllNodesDisconnected(){ + if(!nodeState.isEmpty()){ + boolean allNodeDisconnected = true; + Iterator> iterator = nodeState.entrySet().iterator(); + while(allNodeDisconnected && iterator.hasNext()){ + Map.Entry nodeStateEntry = iterator.next(); + allNodeDisconnected = allNodeDisconnected && (nodeStateEntry.getValue().state == ConnectionState.DISCONNECTED); + } + return allNodeDisconnected; + }else { + return false; + } + } + + /** * Get the state of our connection to the given state * @param node The id of the node * @return The state of our connection */ private NodeConnectionState nodeState(int node) { NodeConnectionState state = this.nodeState.get(node); + // TODO THIS IS ALSO AN ISSUE for it... if (state == null) throw new IllegalStateException("No entry found for node " + node); return state; } + + /* + * IMPLEMENTING EXPENECIAL BACKOF + * Returns the next wait interval, in milliseconds, using an exponential + * backoff algorithm. + */ + public long getWaitTimeExp(int retryCount) { + long waitTime = ((long) Math.pow(2, retryCount) * reconnectBackoffMs); + return waitTime; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 3976955..27fa02f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -89,5 +89,12 @@ public interface KafkaClient { * Close the client and disconnect from all nodes */ public void close(); - + + /** + * + * Check if all Nodes are down... + * + */ + public boolean isAllRegistredNodesAreDown(); + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 525b95e..d44c576 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -400,15 +400,21 @@ public class NetworkClient implements KafkaClient { private void initiateConnect(Node node, long now) { try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - connectionStates.disconnected(node.id()); + connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } } + @Override + public boolean isAllRegistredNodesAreDown() { + return this.connectionStates.isAllNodesDisconnected(); + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java index 752a979..0ee1688 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java @@ -19,13 +19,26 @@ final class NodeConnectionState { ConnectionState state; long lastConnectAttemptMs; - + + //TODO STORE it some where else.. + int currentRetryCount; + int totalConnectionFailed; + public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { this.state = state; this.lastConnectAttemptMs = lastConnectAttempt; } public String toString() { - return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; + return "NodeState(" + state + ", " + lastConnectAttemptMs + ") {currentRetryCount=" +currentRetryCount + + " totalConnectionFailed=" + totalConnectionFailed + "}"; + + } + + public void incrementRetry(){ + currentRetryCount++; + totalConnectionFailed++; } + + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 32f444e..4815f00 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -95,6 +95,8 @@ public class KafkaProducer implements Producer { } private KafkaProducer(ProducerConfig config) { + + log.trace("Starting the Kafka producer"); this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) @@ -147,6 +149,8 @@ public class KafkaProducer implements Producer { config.logUnused(); log.debug("Kafka producer started"); + System.out.println(ioThreadName + "======KAFKA LOGMON TEAM PATCH HIGH CPU FIX VERSION IN Sender.java AND producer.close() METHOD====="); + } private static int parseAcks(String acksString) { @@ -258,7 +262,7 @@ public class KafkaProducer implements Producer { throw e; } } - + /** * Wait for cluster metadata including partitions for the given topic to be available. * @param topic The topic we want metadata for @@ -318,10 +322,21 @@ public class KafkaProducer implements Producer { public void close() { log.trace("Closing the Kafka producer."); this.sender.initiateClose(); + + // FIX ME HERE get timeout from config file.... + /// WHEN Network is down this causes HIGH CPU and close does not shutdown the io sender thread kill thread... try { - this.ioThread.join(); + // TODO FIX ME HERE GET THIS FROM CONFIGURATION.. + this.ioThread.join(15000); } catch (InterruptedException e) { + // TODO WHY THROW just swallow it... throw new KafkaException(e); + }finally{ + if(this.ioThread.isAlive()){ + // Kill it otherwise this application CPU does not recover and any way we are going down data loss will occur + // Ideally I need to detect if all brokers are down then just kill thread otherwise continue to avoid data loss... + ioThread.interrupt(); + } } this.metrics.close(); log.debug("The Kafka producer has closed."); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 72d3ddd..2152f4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -199,16 +199,16 @@ public class ProducerConfig extends AbstractConfig { Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) - .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 1000L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 700L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.LOW, METADATA_FETCH_TIMEOUT_DOC) - .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) + .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 15 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 84a7a07..5d8d4e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -81,6 +81,7 @@ public class Sender implements Runnable { /* metrics */ private final SenderMetrics sensors; + public Sender(KafkaClient client, Metadata metadata, @@ -102,34 +103,84 @@ public class Sender implements Runnable { this.time = time; this.sensors = new SenderMetrics(metrics); } + /** * The main run loop for the sender thread */ public void run() { log.debug("Starting Kafka producer I/O thread."); - + + // TODO All this need configuration....and Static variable + long sleepInMs = 500; + + int countinuousRetry = 0; + // TODO When connection are lost to Brokers or all of the brokers this means that // main loop, runs until close is called while (running) { + long start = time.milliseconds(); try { - run(time.milliseconds()); + run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); + }finally{ + long durationInMs = time.milliseconds() - start; + // TODO Fix ME HERE GET DO exponential back-off sleep etc to prevent still CPU CYCLE HERE ?????? How Much ...for the edge case... + if(durationInMs < 200){ + if(client.isAllRegistredNodesAreDown()){ + countinuousRetry++; + /// TODO MAKE THIS CONSTANT CONFIGURATION..... when do we rest this interval ????? so we can try aggressive again... + sleepInMs = ((long) Math.pow(2, countinuousRetry) * 500); + }else{ + sleepInMs = 500 ; + countinuousRetry = 0; + } + + // Wait until the desired next time arrives using nanosecond + // accuracy timer (wait(time) isn't accurate enough on most platforms) + try { + // TODO SLEEP IS NOT GOOD SOLUTON.. + Thread.sleep(sleepInMs); + } catch (InterruptedException e) { + log.error("While sleeping some one interupted this tread probally close method on prodcuer close () "); + } + } } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); - + + System.out.println("FIX CLOSE WHEN BROKERS ARE DOWN AND CPU SPIKES SO APP WILL NOT SHUTDOWN...."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + + // lets not try to send + + + int retry = 0; + + while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0){ try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); + }finally{ + //TODO lets try for 3 times to send while closing even if all the node are down try to connect now.... + // Ideally I want to aggressive try to connect by reseting all retry to zero but lets try to send any way... + retry ++; + if(retry == 3){ + log.error("While shutting down Kafka producer I/O thread data is there but io thread " + + "given up sending data so data lost as occured...otherwise this thread never dies and producer close method does not exit() so"); + log.info("IS All Node State DISCONNECTED Connection state is " + this.client.isAllRegistredNodesAreDown()); + break; + } } } + + if(this.accumulator.hasUnsent()){ + log.error("There is unsent data in memmory so we will not trasfer..."); + } this.client.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index d3299b9..fdba8f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -88,6 +88,13 @@ public final class Cluster { int nodeId = -1; for (InetSocketAddress address : addresses) nodes.add(new Node(nodeId--, address.getHostName(), address.getPort())); + + /*** + * Let's get the partition information now in init thread so we do not pay the cost... + * + */ + + return new Cluster(nodes, new ArrayList(0)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 47b5d4a..88cf130 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -98,4 +98,9 @@ public class MockClient implements KafkaClient { return null; } + @Override + public boolean isAllRegistredNodesAreDown() { + return false; + } + } -- 1.9.3 (Apple Git-50)