Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.1.3, 1.2.4, 2.6.2
-
None
Description
Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this means buffered messages will remain in the worker 10 minutes before being given up and reprocessed.
This leads to increased latencies in the topology.
It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property to be configured by the user.
Code that leads to this situation:
public class Client extends ConnectionWithStatus implements ISaslClient { private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
@Override public void close() { if (!closing) { LOG.info("closing Netty Client {}", dstAddressPrefixedName); // Set closing to true to prevent any further reconnection attempts. closing = true; waitForPendingMessagesToBeSent(); closeChannel(); // stop tracking metrics for this client if (this.metricRegistry != null) { this.metricRegistry.deregister(this.metrics); } } }
private void waitForPendingMessagesToBeSent() { LOG.info("waiting up to {} ms to send {} pending messages to {}", PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName); long totalPendingMsgs = pendingMessages.get(); long startMs = System.currentTimeMillis(); while (pendingMessages.get() != 0) { try { long deltaMs = System.currentTimeMillis() - startMs; if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) { LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " + "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs); break; } Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS); } catch (InterruptedException e) { break; } } }