diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c8c4212..2202f90 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -85,8 +85,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( * Set this explicitly for only testing purpose. */ val consumerId: Option[String] = Option(props.getString("consumer.id", null)) - /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */ + /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) + require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + + " to prevent unnecessary socket timeouts") /** the socket receive buffer for network requests */ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 419156e..70cb557 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -36,7 +36,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { private val lock = new Object() @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, - config.sendBufferBytes, config.requestTimeoutMs) + config.sendBufferBytes, config.socketTimeoutMs) val brokerInfo = "host_%s-port_%s".format(config.host, config.port) val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index 69b2d0c..58466f9 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -53,10 +53,17 @@ trait SyncProducerConfigShared { */ val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs, (1, Integer.MAX_VALUE)) + + /* + * The socket timeout for network requests. Its value should be at least request.timeout.ms. + */ + val socketTimeoutMs = props.getInt("socket.timeout.ms", SyncProducerConfig.DefaultSocketTimeoutMs) + require(requestRequiredAcks <= socketTimeoutMs, "socket.timeout.ms should always be at least request.timeout.ms") } object SyncProducerConfig { val DefaultClientId = "" val DefaultRequiredAcks : Short = 0 - val DefaultAckTimeoutMs = 10000 + val DefaultAckTimeoutMs = 10 * 1000 + val DefaultSocketTimeoutMs = 30 * 1000 } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a7e5b73..4dcc968 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -189,8 +189,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */ val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000) - /* the socket timeout for network requests */ + /* the socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms. */ val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout) + require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + + " to prevent unnecessary socket timeouts") /* the socket receive buffer for network requests */ val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)