diff --git a/config/log4j.properties b/config/log4j.properties index 00f891c..c832902 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -43,10 +43,12 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG log4j.logger.kafka=INFO +#log4j.logger.kafka.producer.SyncProducer=TRACE log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender log4j.additivity.kafka.network.RequestChannel$=false +log4j.logger.kafka.network.Processor=TRACE, requestAppender #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender #log4j.additivity.kafka.server.KafkaApis=false log4j.logger.kafka.request.logger=TRACE, requestAppender diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md index 1bd3721..6e57fde 100644 --- a/contrib/hadoop-producer/README.md +++ b/contrib/hadoop-producer/README.md @@ -4,20 +4,16 @@ Hadoop to Kafka Bridge What's new? ----------- -* Kafka 0.8 support - * No more ZK-based load balancing (backwards incompatible change) -* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a - key in the output committer of your job. The Pig StoreFunc doesn't support - semantic partitioning. -* Config parameters are now the same as the Kafka producer, just prepended with - kafka.output (e.g., kafka.output.max.message.size). This is a backwards - incompatible change. +* Now supports Kafka's software load balancer (Kafka URIs are specified with + kafka+zk as the scheme, as described below) +* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy + SyncProducer. What is it? ----------- The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There -are two possible mechanisms, varying from easy to difficult: writing a Pig +are two possible mechanisms, varying from easy to difficult: writing a Pig script and writing messages in Avro format, or rolling your own job using the Kafka `OutputFormat`. @@ -29,8 +25,10 @@ multiple times in the same push. How do I use it? ---------------- -With this bridge, Kafka topics are URIs and are specified as URIs of the form -`kafka:///` to connect to a specific Kafka broker. +With this bridge, Kafka topics are URIs and are specified in one of two +formats: `kafka+zk://#`, which uses the software load +balancer, or the legacy `kafka:///` to connect to a +specific Kafka broker. ### Pig ### @@ -39,17 +37,19 @@ row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage` with the Avro schema as its first argument. You'll need to register the appropriate Kafka JARs. Here is what an example Pig script looks like: - REGISTER hadoop-producer_2.8.0-0.8.0.jar; + REGISTER hadoop-producer_2.8.0-0.7.0.jar; REGISTER avro-1.4.0.jar; REGISTER piggybank.jar; - REGISTER kafka-0.8.0.jar; + REGISTER kafka-0.7.0.jar; REGISTER jackson-core-asl-1.5.5.jar; REGISTER jackson-mapper-asl-1.5.5.jar; + REGISTER zkclient-20110412.jar; + REGISTER zookeeper-3.3.4.jar; REGISTER scala-library.jar; - member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray); + member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray); names = FOREACH member_info GENERATE name; - STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); + STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert from Pig's data model to the specified Avro schema. @@ -58,8 +58,8 @@ Further, multi-store is possible with KafkaStorage, so you can easily write to multiple topics and brokers in the same job: SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000; - STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema'); - STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema'); + STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema'); + STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema'); ### KafkaOutputFormat ### @@ -68,27 +68,80 @@ uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e., BytesWritable). This is a lower-level method of publishing data, as it allows you to precisely control output. -Included is an example that publishes some input text line-by-line to a topic. -With KafkaOutputFormat, the key can be a null, where it is ignored by the -producer (random partitioning), or any object for semantic partitioning of the -stream (with an appropriate Kafka partitioner set). Speculative execution is -turned off by the OutputFormat. +Here is an example that publishes some input text. With KafkaOutputFormat, the +key is a NullWritable and is ignored; only values are published. Speculative +execution is turned off by the OutputFormat. + + import kafka.bridge.hadoop.KafkaOutputFormat; + + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.BytesWritable; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapreduce.Job; + import org.apache.hadoop.mapreduce.Mapper; + import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + + import java.io.IOException; + + public class TextPublisher + { + public static void main(String[] args) throws Exception + { + if (args.length != 2) { + System.err.println("usage: "); + return; + } + + Job job = new Job(); + + job.setJarByClass(TextPublisher.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(BytesWritable.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(KafkaOutputFormat.class); + + job.setMapperClass(TheMapper.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, new Path(args[0])); + KafkaOutputFormat.setOutputPath(job, new Path(args[1])); + + if (!job.waitForCompletion(true)) { + throw new RuntimeException("Job failed!"); + } + } + + public static class TheMapper extends Mapper + { + @Override + protected void map(Object key, Object value, Context context) throws IOException, InterruptedException + { + context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes())); + } + } + } What can I tune? ---------------- -* kafka.output.queue.size: Bytes to queue in memory before pushing to the Kafka - producer (i.e., the batch size). Default is 10*1024*1024 (10MB). +Normally, you needn't change any of these parameters: -Any of Kafka's producer parameters can be changed by prefixing them with -"kafka.output" in one's job configuration. For example, to change the -compression codec, one would add the "kafka.output.compression.codec" parameter -(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no -compression). +* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka + producer (i.e., the batch size). Default is 10*1024*1024 (10MB). +* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka + producer docs). Default is 30*1000 (30s). +* kafka.output.reconnect_timeout: Milliseconds to wait until attempting + reconnection (see Kafka producer docs). Default is 1000 (1s). +* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer + docs). Default is 64*1024 (64KB). +* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer + docs). Default is 1024*1024 (1MB). +* kafka.output.compression_codec: The compression codec to use (see Kafka producer + docs). Default is 0 (no compression). For easier debugging, the above values as well as the Kafka broker information -(kafka.broker.list), the topic (kafka.output.topic), and the schema -(kafka.output.schema) are injected into the job's configuration. By default, -the Hadoop producer uses Kafka's sync producer as asynchronous operation -doesn't make sense in the batch Hadoop case. +(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic), +and the schema (kafka.output.schema) are injected into the job's configuration. diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java index d447b1d..5acbcee 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java @@ -16,18 +16,18 @@ */ package kafka.bridge.examples; + import java.io.IOException; import kafka.bridge.hadoop.KafkaOutputFormat; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -/** - * Publish a text file line by line to a Kafka topic - */ public class TextPublisher { public static void main(String[] args) throws Exception @@ -40,6 +40,8 @@ public class TextPublisher Job job = new Job(); job.setJarByClass(TextPublisher.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(KafkaOutputFormat.class); @@ -54,12 +56,12 @@ public class TextPublisher } } - public static class TheMapper extends Mapper + public static class TheMapper extends Mapper { @Override - protected void map(Object key, Text value, Context context) throws IOException, InterruptedException + protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { - context.write(null, value.getBytes()); + context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes())); } } } diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index aa1f944..2fd2035 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -16,16 +16,19 @@ */ package kafka.bridge.hadoop; + import java.io.IOException; import java.net.URI; -import java.util.*; +import java.util.Properties; import kafka.common.KafkaException; import kafka.javaapi.producer.Producer; +import kafka.message.Message; import kafka.producer.ProducerConfig; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -35,26 +38,26 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.log4j.Logger; -public class KafkaOutputFormat extends OutputFormat +public class KafkaOutputFormat extends OutputFormat { private Logger log = Logger.getLogger(KafkaOutputFormat.class); public static final String KAFKA_URL = "kafka.output.url"; - /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */ + /** Bytes to buffer before the OutputFormat does a send */ public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; - public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; - private static final Map kafkaConfigMap; - static { - Map cMap = new HashMap(); - - // default Hadoop producer configs - cMap.put("producer.type", "sync"); - cMap.put("send.buffer.bytes", Integer.toString(64*1024)); - cMap.put("compression.codec", Integer.toString(1)); - - kafkaConfigMap = Collections.unmodifiableMap(cMap); - } + /** Default value for Kafka's connect.timeout.ms */ + public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000; + /** Default value for Kafka's reconnect.interval*/ + public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000; + /** Default value for Kafka's buffer.size */ + public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024; + /** Default value for Kafka's max.message.size */ + public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024; + /** Default value for Kafka's producer.type */ + public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync"; + /** Default value for Kafka's compression.codec */ + public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0; public KafkaOutputFormat() { @@ -88,7 +91,7 @@ public class KafkaOutputFormat extends OutputFormat } @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Path outputPath = getOutputPath(context); if (outputPath == null) @@ -99,44 +102,58 @@ public class KafkaOutputFormat extends OutputFormat Properties props = new Properties(); String topic; - props.putAll(kafkaConfigMap); // inject default configuration - for (Map.Entry m : job) { // handle any overrides - if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX)) - continue; - if (m.getKey().equals(KAFKA_URL)) - continue; - - String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1); - props.setProperty(kafkaKeyName, m.getValue()); // set Kafka producer property - } - - // inject Kafka producer props back into jobconf for easier debugging - for (Map.Entry m : props.entrySet()) { - job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString()); - } - - // KafkaOutputFormat specific parameters - final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE); - job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize); + final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE); + final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT); + final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL); + final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE); + final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE); + final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE); + final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC); + + job.setInt("kafka.output.queue_size", queueSize); + job.setInt("kafka.output.connect_timeout", timeout); + job.setInt("kafka.output.reconnect_interval", interval); + job.setInt("kafka.output.bufsize", bufSize); + job.setInt("kafka.output.max_msgsize", maxSize); + job.set("kafka.output.producer_type", producerType); + job.setInt("kafka.output.compression_codec", compressionCodec); + + props.setProperty("producer.type", producerType); + props.setProperty("send.buffer.bytes", Integer.toString(bufSize)); + props.setProperty("connect.timeout.ms", Integer.toString(timeout)); + props.setProperty("reconnect.interval", Integer.toString(interval)); + props.setProperty("compression.codec", Integer.toString(compressionCodec)); if (uri.getScheme().equals("kafka")) { - // using the direct broker list + // using the legacy direct broker list // URL: kafka:/// // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar - String brokerList = uri.getAuthority(); + + // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique + // (KAFKA-258 will remove the broker_id requirement) + StringBuilder brokerListBuilder = new StringBuilder(); + String delim = ""; + int brokerId = 0; + for (String serverPort : uri.getAuthority().split(",")) { + brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort)); + delim = ","; + brokerId++; + } + String brokerList = brokerListBuilder.toString(); + props.setProperty("broker.list", brokerList); - job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList); + job.set("kafka.broker.list", brokerList); if (uri.getPath() == null || uri.getPath().length() <= 1) throw new KafkaException("no topic specified in kafka uri"); - topic = uri.getPath().substring(1); // ignore the initial '/' in the path - job.set(KAFKA_CONFIG_PREFIX + ".topic", topic); + topic = uri.getPath().substring(1); // ignore the initial '/' in the path + job.set("kafka.output.topic", topic); log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic)); } else throw new KafkaException("missing scheme from kafka uri (must be kafka://)"); - Producer producer = new Producer(new ProducerConfig(props)); - return new KafkaRecordWriter(producer, topic, queueSize); + Producer producer = new Producer(new ProducerConfig(props)); + return new KafkaRecordWriter(producer, topic, queueSize); } } diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java index a381ccd..8c84786 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java @@ -16,58 +16,49 @@ */ package kafka.bridge.hadoop; + import java.io.IOException; import java.util.LinkedList; import java.util.List; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; +import kafka.message.Message; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -public class KafkaRecordWriter extends RecordWriter +public class KafkaRecordWriter extends RecordWriter { - protected Producer producer; + protected Producer producer; protected String topic; - protected List> msgList = new LinkedList>(); + protected List> msgList = new LinkedList>(); protected int totalSize = 0; protected int queueSize; - public KafkaRecordWriter(Producer producer, String topic, int queueSize) + public KafkaRecordWriter(Producer producer, String topic, int queueSize) { this.producer = producer; this.topic = topic; this.queueSize = queueSize; } - protected void sendMsgList() throws IOException + protected void sendMsgList() { if (msgList.size() > 0) { - try { - producer.send(msgList); - } - catch (Exception e) { - throw new IOException(e); // all Kafka exceptions become IOExceptions - } + producer.send(msgList); msgList.clear(); totalSize = 0; } } @Override - public void write(K key, V value) throws IOException, InterruptedException + public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException { - byte[] valBytes; - if (value instanceof byte[]) - valBytes = (byte[]) value; - else if (value instanceof BytesWritable) - valBytes = ((BytesWritable) value).getBytes(); - else - throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish"); - - msgList.add(new KeyedMessage(this.topic, key, valBytes)); - totalSize += valBytes.length; + Message msg = new Message(value.getBytes()); + msgList.add(new KeyedMessage(this.topic, msg)); + totalSize += msg.size(); // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE) diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java index d24a85a..faa1950 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java @@ -16,6 +16,7 @@ */ package kafka.bridge.pig; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -24,6 +25,8 @@ import kafka.bridge.hadoop.KafkaRecordWriter; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.Encoder; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; @@ -35,7 +38,7 @@ import org.apache.pig.piggybank.storage.avro.PigSchema2Avro; public class AvroKafkaStorage extends StoreFunc { - protected KafkaRecordWriter writer; + protected KafkaRecordWriter writer; protected org.apache.avro.Schema avroSchema; protected PigAvroDatumWriter datumWriter; protected Encoder encoder; @@ -65,7 +68,6 @@ public class AvroKafkaStorage extends StoreFunc } @Override - @SuppressWarnings("unchecked") public void prepareToWrite(RecordWriter writer) throws IOException { if (this.avroSchema == null) @@ -106,7 +108,7 @@ public class AvroKafkaStorage extends StoreFunc this.encoder.flush(); try { - this.writer.write(null, this.os.toByteArray()); + this.writer.write(NullWritable.get(), new BytesWritable(this.os.toByteArray())); } catch (InterruptedException e) { throw new IOException(e); diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 19c961e..dc4ed8e 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -24,7 +24,7 @@ import scala.collection.immutable.Map import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.ConsumerConfig import java.util.concurrent.atomic.AtomicInteger -import kafka.network.{RequestChannel} +import kafka.network.RequestChannel case class PartitionFetchInfo(offset: Long, fetchSize: Int) diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 743227d..5bff709 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -22,7 +22,6 @@ import scala.collection.Map import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.api.ApiUtils._ - object ProducerResponse { def readFrom(buffer: ByteBuffer): ProducerResponse = { val correlationId = buffer.getInt @@ -44,7 +43,6 @@ object ProducerResponse { case class ProducerResponseStatus(error: Short, offset: Long) - case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse { diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 0580636..be3c7be 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -23,7 +23,7 @@ import kafka.api.ApiUtils._ import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException} import kafka.common.ErrorMapping import kafka.network.RequestChannel.Response -import kafka.utils.{Logging} +import kafka.utils.Logging object StopReplicaRequest extends Logging { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 824f8f1..88007b1 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -23,7 +23,7 @@ import collection.mutable.ListBuffer import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping import kafka.network.RequestChannel.Response -import kafka.utils.{Logging} +import kafka.utils.Logging object TopicMetadataRequest extends Logging { val CurrentVersion = 0.shortValue diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index a780a41..1478e58 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -33,7 +33,6 @@ import kafka.utils.ZkUtils._ import kafka.common._ import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge -import kafka.api.OffsetRequest import kafka.metrics._ import scala.Some diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9b0f7e9..d0e67e3 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -116,7 +116,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = + def receiveRequest(): RequestChannel.Request = requestQueue.take() /** Get a response for the given processor if there is one */ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b056e25..12ddbf4 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -35,7 +35,7 @@ import kafka.utils._ class SocketServer(val brokerId: Int, val host: String, val port: Int, - val numProcessorThreads: Int, + val numProcessorThreads: Int, val maxQueuedRequests: Int, val maxRequestSize: Int = Int.MaxValue) extends Logging { this.logIdent = "[Socket Server on Broker " + brokerId + "], " @@ -205,7 +205,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce * each of which has its own selectors */ private[kafka] class Processor(val id: Int, - val time: Time, + val time: Time, val maxRequestSize: Int, val requestChannel: RequestChannel) extends AbstractServerThread { @@ -218,7 +218,9 @@ private[kafka] class Processor(val id: Int, configureNewConnections() // register any new responses for writing processNewResponses() + val startSelectTime = SystemTime.milliseconds val ready = selector.select(300) + trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms") if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() @@ -258,11 +260,21 @@ private[kafka] class Processor(val id: Int, private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { - trace("Socket server received response to send, registering for write: " + curr) val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { - key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + if(curr.responseSend == null) { + // a null response send object indicates that there is no response to send to the client. + // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests + // that are sitting in the server's socket buffer + trace("Socket server received empty response to send, registering for read: " + curr) + key.interestOps(SelectionKey.OP_READ) + key.attach(null) + curr.request.updateRequestMetrics + } else { + trace("Socket server received response to send, registering for write: " + curr) + key.interestOps(SelectionKey.OP_WRITE) + key.attach(curr) + } } catch { case e: CancelledKeyException => { debug("Ignoring response for closed socket.") @@ -297,7 +309,7 @@ private[kafka] class Processor(val id: Int, private def configureNewConnections() { while(newConnections.size() > 0) { val channel = newConnections.poll() - debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress) + debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) channel.register(selector, SelectionKey.OP_READ) } } @@ -320,10 +332,12 @@ private[kafka] class Processor(val id: Int, } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) - trace("Received request, sending for processing by handler: " + req) key.attach(null) + // explicitly reset interest ops to not READ, no need to wake up the selector just yet + key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done + trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } @@ -343,8 +357,10 @@ private[kafka] class Processor(val id: Int, if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) + trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { + trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index b209a97..a0e2b44 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -80,7 +80,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, if(tmd.errorCode == ErrorMapping.NoError){ topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) + warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError){ debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode)) diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 8b77465..eebfda6 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -87,7 +87,12 @@ object ConsoleProducer { .describedAs("reader_class") .ofType(classOf[java.lang.String]) .defaultsTo(classOf[LineMessageReader].getName) - val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*100) + val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + "This allows custom configuration for a user-defined message reader.") .withRequiredArg .describedAs("prop") @@ -116,6 +121,7 @@ object ConsoleProducer { val keyEncoderClass = options.valueOf(keyEncoderOpt) val valueEncoderClass = options.valueOf(valueEncoderOpt) val readerClass = options.valueOf(messageReaderOpt) + val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt)) cmdLineProps.put("topic", topic) @@ -133,7 +139,7 @@ object ConsoleProducer { props.put("request.timeout.ms", requestTimeoutMs.toString) props.put("key.serializer.class", keyEncoderClass) props.put("serializer.class", valueEncoderClass) - + props.put("send.buffer.bytes", socketBuffer.toString) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index af077e0..3d22e6d 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -32,6 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var compressionCodec:String = null var enqueueTimeout:String = null var queueSize:String = null + var requiredNumAcks: Int = Int.MaxValue private var producer: Producer[String, String] = null @@ -40,22 +41,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getBrokerList:String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } - + def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } def getProducerType:String = producerType def setProducerType(producerType:String) { this.producerType = producerType } - + def getCompressionCodec:String = compressionCodec def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } - + def getEnqueueTimeout:String = enqueueTimeout def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } def getQueueSize:String = queueSize def setQueueSize(queueSize:String) { this.queueSize = queueSize } + def getRequiredNumAcks:Int = requiredNumAcks + def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks } + override def activateOptions() { // check for config parameter validity val props = new Properties() @@ -75,12 +79,13 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { if(compressionCodec != null) props.put("compression.codec", compressionCodec) if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) + if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) LogLog.debug("Logging for topic: " + topic) } - + override def append(event: LoggingEvent) { val message : String = if( this.layout == null) { event.getRenderedMessage diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 66638f2..3ded46e 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -26,7 +26,7 @@ import kafka.common.QueueFullException import kafka.metrics._ -class Producer[K,V](config: ProducerConfig, +class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing extends Logging { diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0469a39..306f200 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -62,7 +62,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse): Receive = { + private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = { lock synchronized { verifyRequest(request) getOrMakeConnection() @@ -70,7 +70,10 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { var response: Receive = null try { blockingChannel.send(request) - response = blockingChannel.receive() + if(readResponse) + response = blockingChannel.receive() + else + trace("Skipping reading response") } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry @@ -83,7 +86,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } /** - * Send a message + * Send a message. If the producerRequest had required.request.acks=0, then the + * returned response object is null */ def send(producerRequest: ProducerRequest): ProducerResponse = { val requestSize = producerRequest.sizeInBytes @@ -95,10 +99,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { specificTimer.time { - response = doSend(producerRequest) + response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true) } } - ProducerResponse.readFrom(response.buffer) + if(producerRequest.requiredAcks != 0) + ProducerResponse.readFrom(response.buffer) + else + null } def send(request: TopicMetadataRequest): TopicMetadataResponse = { diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 374cd6b..7d70d95 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -243,20 +243,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - if (response.status.size != producerRequest.data.size) - throw new KafkaException("Incomplete response (%s) for producer request (%s)" - .format(response, producerRequest)) - if (logger.isTraceEnabled) { - val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) - successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) - } - failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq - .map(partitionStatus => partitionStatus._1) - if(failedTopicPartitions.size > 0) - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" - .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) - failedTopicPartitions + if(response != null) { + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) + if (logger.isTraceEnabled) { + val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) + successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => + trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + } + failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) + if(failedTopicPartitions.size > 0) + error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" + .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) + failedTopicPartitions + } else + Seq.empty[TopicAndPartition] } catch { case t: Throwable => warn("Failed to send producer request with correlation id %d to broker %d with data %s" diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6df077b..ece1b46 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -23,13 +23,13 @@ import kafka.message._ import kafka.network._ import org.apache.log4j.Logger import scala.collection._ -import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup import org.I0Itec.zkclient.ZkClient import kafka.common._ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} +import kafka.network.RequestChannel.Response /** @@ -127,8 +127,11 @@ class KafkaApis(val requestChannel: RequestChannel, val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) - if (produceRequest.requiredAcks == 0 || - produceRequest.requiredAcks == 1 || + if(produceRequest.requiredAcks == 0) { + // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer + // and is tuned for very high throughput + requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null)) + } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || allPartitionHaveReplicationFactorOne || numPartitionsInError == produceRequest.numPartitions) { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c03f758..e035adf 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -20,7 +20,7 @@ package kafka.server import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet import kafka.common.{TopicAndPartition, ErrorMapping} -import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} class ReplicaFetcherThread(name:String, sourceBroker: Broker, diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index d7d03ea..87067d7 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -19,6 +19,8 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=ERROR +log4j.logger.kafka.producer.SyncProducer=TRACE +log4j.logger.kafka.network=TRACE # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 402fced..03c6241 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -21,10 +21,8 @@ import java.nio.ByteBuffer import junit.framework.Assert._ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} -import java.util.Properties import kafka.utils.Utils import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer._ import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient @@ -83,9 +81,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetch() { val topic = "test-topic" - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + val props = producer.config.props.props val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -111,9 +107,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" - val props = new Properties() - props.put("serializer.class", classOf[StringEncoder].getName.toString) - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + val props = producer.config.props.props props.put("compression", "true") val config = new ProducerConfig(props) @@ -272,7 +266,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } producer.send(produceList: _*) - // wait a bit for produced message to be available val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { @@ -315,6 +308,34 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } + def testPipelinedProduceRequests() { + createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + val props = producer.config.props.props + props.put("request.required.acks", "0") + val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props)) + + // send some messages + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); + val messages = new mutable.HashMap[String, Seq[String]] + val builder = new FetchRequestBuilder() + var produceList: List[KeyedMessage[String, String]] = Nil + for( (topic, partition) <- topics) { + val messageList = List("a_" + topic, "b_" + topic) + val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _)) + messages += topic -> messageList + pipelinedProducer.send(producerData:_*) + builder.addFetch(topic, partition, 0, 10000) + } + + // test if the consumer received the messages in the correct order when producer has enabled request pipelining + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) + } + } + /** * For testing purposes, just create these topics each with one partition and one replica for * which the provided broker should the leader for. Create and wait for broker to lead. Simple. diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index 0fde254..731ee59 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -19,11 +19,8 @@ package kafka.integration import kafka.consumer.SimpleConsumer import org.scalatest.junit.JUnit3Suite -import java.util.Properties import kafka.producer.{ProducerConfig, Producer} import kafka.utils.TestUtils -import kafka.serializer._ - trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { val port: Int val host = "localhost" @@ -32,16 +29,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes override def setUp() { super.setUp - val props = new Properties() - props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - props.put("retry.backoff.ms", "1000") - props.put("message.send.max.retries", "3") - props.put("request.required.acks", "-1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner") producer = new Producer(new ProducerConfig(props)) consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index c25255f..67497dd 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -36,7 +36,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var logDirZk: File = null var config: KafkaConfig = null - var serverZk: KafkaServer = null + var server: KafkaServer = null var simpleConsumerZk: SimpleConsumer = null @@ -55,14 +55,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) - serverZk = TestUtils.createServer(config); + server = TestUtils.createServer(config); simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") } @After override def tearDown() { simpleConsumerZk.close - serverZk.shutdown + server.shutdown Utils.rm(logDirZk) super.tearDown() } @@ -164,6 +164,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") + props.put("log4j.appender.KAFKA.requiredNumAcks", "1") props } } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7395cbc..a76e876 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -21,7 +21,6 @@ import java.net._ import java.io._ import org.junit._ import org.scalatest.junit.JUnitSuite -import kafka.utils.TestUtils import java.util.Random import junit.framework.Assert._ import kafka.producer.SyncProducerConfig @@ -29,13 +28,14 @@ import kafka.api.ProducerRequest import java.nio.ByteBuffer import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet +import java.nio.channels.SelectionKey class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, host = null, - port = TestUtils.choosePort, + port = kafka.utils.TestUtils.choosePort, numProcessorThreads = 1, maxQueuedRequests = 50, maxRequestSize = 50) @@ -102,4 +102,40 @@ class SocketServerTest extends JUnitSuite { receiveResponse(socket) } + @Test + def testPipelinedRequestOrdering() { + val socket = connect() + val correlationId = -1 + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack: Short = 0 + val emptyRequest = + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) + emptyRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(socket, 0, serializedBytes) + sendRequest(socket, 0, serializedBytes) + + // here the socket server should've read only the first request completely and since the response is not sent yet + // the selection key should not be readable + val request = server.requestChannel.receiveRequest + Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ) + + val id = request.buffer.getShort + val send = new BoundedByteBufferSend(request.buffer.slice) + server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) + + // if everything is working correctly, until you send a response for the first request, + // the 2nd request will not be read by the socket server + val request2 = server.requestChannel.receiveRequest + val id2 = request2.buffer.getShort + val send2 = new BoundedByteBufferSend(request2.buffer.slice) + server.requestChannel.sendResponse(new RequestChannel.Response(0, request2, null)) + Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ) + } } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index fb0666f..922a200 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -356,11 +356,9 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testBrokerListAndAsync() { return - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") + val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") props.put("batch.num.messages", "5") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) @@ -394,9 +392,10 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testFailedSendRetryLogic() { val props = new Properties() + props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) @@ -410,12 +409,12 @@ class AsyncProducerTest extends JUnit3Suite { // produce request for topic1 and partitions 0 and 1. Let the first request fail // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. - val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11) - val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17) + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11) + val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17) val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) - val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21) + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21) val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 792919b..04acef5 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -199,7 +199,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", "2000") -// props.put("request.required.acks", "-1") + props.put("request.required.acks", "1") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic @@ -258,6 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", String.valueOf(timeoutMs)) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 89ba944..12019d4 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -18,7 +18,6 @@ package kafka.producer import java.net.SocketTimeoutException -import java.util.Properties import junit.framework.Assert import kafka.admin.CreateTopicCommand import kafka.integration.KafkaServerTestHarness @@ -38,16 +37,13 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testReachableServer() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("connect.timeout.ms", "500") - props.put("reconnect.interval", "1000") + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -56,7 +52,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertTrue((firstEnd-firstStart) < 500) val secondStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -64,7 +61,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val secondEnd = SystemTime.milliseconds Assert.assertTrue((secondEnd-secondStart) < 500) try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -74,36 +72,31 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testEmptyProduceRequest() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("connect.timeout.ms", "300") - props.put("reconnect.interval", "500") + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks + val ack: Short = 1 val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) - val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) + Assert.assertTrue(response != null) Assert.assertTrue(!response.hasError && response.status.size == 0) } @Test def testMessageSizeTooLarge() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val producer = new SyncProducer(new SyncProducerConfig(props)) CreateTopicCommand.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) - val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) + val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1)) Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error) @@ -112,7 +105,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1 val message2 = new Message(new Array[Byte](safeSize)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) - val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) + val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1)) Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error) @@ -122,12 +115,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("connect.timeout.ms", "300") - props.put("reconnect.interval", "500") + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -173,15 +161,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val timeoutMs = 500 val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("request.timeout.ms", String.valueOf(timeoutMs)) + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) - val request = TestUtils.produceRequest("topic1", 0, messages) + val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1) // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled @@ -196,8 +180,22 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { case e => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds - + println("Timeout = " + (t2-t1)) // make sure we don't wait fewer than timeoutMs for a response Assert.assertTrue((t2-t1) >= timeoutMs) } + + @Test + def testProduceRequestWithNoResponse() { + val server = servers.head + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val correlationId = 0 + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack: Short = 0 + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + val producer = new SyncProducer(new SyncProducerConfig(props)) + val response = producer.send(emptyRequest) + Assert.assertTrue(response == null) + } } diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index cd724a3..db46247 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) + val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) producerProps.put("request.required.acks", "-1") diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 7afbe54..3728f8c 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -44,7 +44,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { def testCleanShutdown() { var server = new KafkaServer(config) server.startup() - val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000) + val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config))) producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString) var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 9400328..217ff7a 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -301,16 +301,25 @@ object TestUtils extends Logging { new Producer[K, V](new ProducerConfig(props)) } - def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int, - reconnectInterval: Int): Properties = { + def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = { val props = new Properties() - props.put("producer.type", "sync") props.put("broker.list", brokerList) - props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - props.put("send.buffer.bytes", bufferSize.toString) - props.put("connect.timeout.ms", connectTimeout.toString) - props.put("reconnect.interval", reconnectInterval.toString) - props.put("request.timeout.ms", 30000.toString) + props.put("partitioner.class", partitioner) + props.put("message.send.max.retries", "3") + props.put("retry.backoff.ms", "1000") + props.put("request.timeout.ms", "500") + props.put("request.required.acks", "-1") + props.put("serializer.class", classOf[StringEncoder].getName.toString) + + props + } + + def getSyncProducerConfig(port: Int): Properties = { + val props = new Properties() + props.put("host", "localhost") + props.put("port", port.toString) + props.put("request.timeout.ms", "500") + props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props } diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index fac723a..48d1930 100644 --- a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -239,7 +239,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje - +