diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c8c4212..2202f90 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -85,8 +85,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( * Set this explicitly for only testing purpose. */ val consumerId: Option[String] = Option(props.getString("consumer.id", null)) - /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */ + /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) + require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + + " to prevent unnecessary socket timeouts") /** the socket receive buffer for network requests */ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a7e5b73..4dcc968 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -189,8 +189,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */ val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000) - /* the socket timeout for network requests */ + /* the socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms. */ val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) + require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + + " to prevent unnecessary socket timeouts") /* the socket receive buffer for network requests */ val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)