diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 0b435b9..709a609 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -40,8 +40,11 @@ public class KafkaOutputFormat extends OutputFormat private Logger log = Logger.getLogger(KafkaOutputFormat.class); public static final String KAFKA_URL = "kafka.output.url"; - /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */ - public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; + /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window): + * We set the default to a million bytes so that the server will not reject the batch of messages + * with a MessageSizeTooLargeException. The actual size will be smaller after compression. + */ + public static final int KAFKA_QUEUE_SIZE = 1000000; public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; private static final Map kafkaConfigMap; diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b774431..41c9626 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -18,7 +18,7 @@ package kafka.server import java.util.Properties -import kafka.message.Message +import kafka.message.{MessageSet, Message} import kafka.consumer.ConsumerConfig import kafka.utils.{VerifiableProperties, ZKConfig, Utils} @@ -38,7 +38,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) /* the maximum size of message that the server can receive */ - val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue)) + val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) /* the number of network threads that the server uses for handling network requests */ val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))