diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c8c4212..2798a18 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -87,6 +87,8 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) + require(fetchWaitMaxMs <= socketTimeoutMs, "fetch.wait.max.ms should always be at least socket.timeout.ms" + + " to prevent unnecessary socket timeouts") /** the socket receive buffer for network requests */ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a7e5b73..27fbc2d 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -191,6 +191,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the socket timeout for network requests */ val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) + require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.fetch.wait.max.ms should always be at least replica.socket.timeout.ms" + + " to prevent unnecessary socket timeouts") /* the socket receive buffer for network requests */ val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)