README.md | 35 ++++- src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java | 89 ++++++++++++--- src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java | 21 +-- 3 files changed, 106 insertions(+), 39 deletions(-) Index: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java =================================================================== --- contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (revision 1236484) +++ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (working copy) @@ -18,27 +18,42 @@ import java.util.Properties; -import kafka.javaapi.producer.SyncProducer; -import kafka.producer.SyncProducerConfig; +import kafka.javaapi.producer.Producer; +import kafka.message.Message; +import kafka.producer.ProducerConfig; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.log4j.Logger; import java.io.IOException; import java.net.URI; public class KafkaOutputFormat extends OutputFormat { + private Logger log = Logger.getLogger(KafkaOutputFormat.class); + public static final String KAFKA_URL = "kafka.output.url"; + /** Bytes to buffer before the OutputFormat does a send */ + public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; + + /** Default value for Kafka's connect.timeout.ms */ public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000; + /** Default value for Kafka's reconnect.interval*/ public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000; + /** Default value for Kafka's buffer.size */ public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024; + /** Default value for Kafka's max.message.size */ public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024; - public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; + /** Default value for Kafka's producer.type */ + public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync"; + /** Default value for Kafka's compression.codec */ + public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0; public KafkaOutputFormat() { @@ -77,40 +92,80 @@ Path outputPath = getOutputPath(context); if (outputPath == null) throw new IllegalArgumentException("no kafka output url specified"); - URI uri = outputPath.toUri(); + URI uri = URI.create(outputPath.toString()); Configuration job = context.getConfiguration(); - final String topic = uri.getPath().substring(1); // ignore the initial '/' in the path + Properties props = new Properties(); + String topic; final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE); final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT); final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL); final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE); final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE); + final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE); + final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC); - job.set("kafka.output.server", String.format("%s:%d", uri.getHost(), uri.getPort())); - job.set("kafka.output.topic", topic); job.setInt("kafka.output.queue_size", queueSize); job.setInt("kafka.output.connect_timeout", timeout); job.setInt("kafka.output.reconnect_interval", interval); job.setInt("kafka.output.bufsize", bufSize); job.setInt("kafka.output.max_msgsize", maxSize); + job.set("kafka.output.producer_type", producerType); + job.setInt("kafka.output.compression_codec", compressionCodec); - if (uri.getHost().isEmpty()) - throw new IllegalArgumentException("missing kafka server"); - if (uri.getPath().isEmpty()) - throw new IllegalArgumentException("missing kafka topic"); - - Properties props = new Properties(); - props.setProperty("host", uri.getHost()); - props.setProperty("port", Integer.toString(uri.getPort())); + props.setProperty("producer.type", producerType); props.setProperty("buffer.size", Integer.toString(bufSize)); props.setProperty("connect.timeout.ms", Integer.toString(timeout)); props.setProperty("reconnect.interval", Integer.toString(interval)); props.setProperty("max.message.size", Integer.toString(maxSize)); + props.setProperty("compression.codec", Integer.toString(compressionCodec)); - SyncProducer producer = new SyncProducer(new SyncProducerConfig(props)); + if (uri.getScheme().equals("kafka+zk")) { + // Software load balancer: + // URL: kafka+zk://# + // e.g. kafka+zk://kafka-zk:2181/kafka#foobar + + String zkConnect = uri.getAuthority() + uri.getPath(); + + props.setProperty("zk.connect", zkConnect); + job.set("kafka.zk.connect", zkConnect); + + topic = uri.getFragment(); + if (topic == null) + throw new IllegalArgumentException("no topic specified in kafka uri fragment"); + + log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic)); + } else if (uri.getScheme().equals("kafka")) { + // using the legacy direct broker list + // URL: kafka:/// + // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar + + // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique + // (KAFKA-258 will remove the broker_id requirement) + StringBuilder brokerListBuilder = new StringBuilder(); + String delim = ""; + int brokerId = 0; + for (String serverPort : uri.getAuthority().split(",")) { + brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort)); + delim = ","; + brokerId++; + } + String brokerList = brokerListBuilder.toString(); + + props.setProperty("broker.list", brokerList); + job.set("kafka.broker.list", brokerList); + + if (uri.getPath() == null || uri.getPath().length() <= 1) + throw new IllegalArgumentException("no topic specified in kafka uri"); + + topic = uri.getPath().substring(1); // ignore the initial '/' in the path + job.set("kafka.output.topic", topic); + log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic)); + } else + throw new IllegalArgumentException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)"); + + Producer producer = new Producer(new ProducerConfig(props)); return new KafkaRecordWriter(producer, topic, queueSize); } } - Index: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java =================================================================== --- contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (revision 1236484) +++ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (working copy) @@ -16,30 +16,29 @@ */ package kafka.bridge.hadoop; +import kafka.javaapi.producer.Producer; +import kafka.javaapi.producer.ProducerData; import kafka.message.Message; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.javaapi.producer.SyncProducer; -import kafka.message.NoCompressionCodec; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; public class KafkaRecordWriter extends RecordWriter { - protected SyncProducer producer; + protected Producer producer; protected String topic; - protected List msgList = new ArrayList(); + protected List> msgList = new LinkedList>(); protected int totalSize = 0; protected int queueSize; - public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize) + public KafkaRecordWriter(Producer producer, String topic, int queueSize) { this.producer = producer; this.topic = topic; @@ -49,8 +48,7 @@ protected void sendMsgList() { if (msgList.size() > 0) { - ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList); - producer.send(topic, msgSet); + producer.send(msgList); msgList.clear(); totalSize = 0; } @@ -60,10 +58,11 @@ public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException { Message msg = new Message(value.getBytes()); - msgList.add(msg); + msgList.add(new ProducerData(this.topic, msg)); totalSize += msg.size(); - if (totalSize > queueSize) + // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch + if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE) sendMsgList(); } Index: contrib/hadoop-producer/README.md =================================================================== --- contrib/hadoop-producer/README.md (revision 1236484) +++ contrib/hadoop-producer/README.md (working copy) @@ -1,6 +1,14 @@ Hadoop to Kafka Bridge ====================== +What's new? +----------- + +* Now supports Kafka's software load balancer (Kafka URIs are specified with + kafka+zk as the scheme, as described below) +* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy + SyncProducer. + What is it? ----------- @@ -17,8 +25,10 @@ How do I use it? ---------------- -With this bridge, Kafka topics are URIs and are specified as -`kafka:///`. +With this bridge, Kafka topics are URIs and are specified in one of two +formats: `kafka+zk://#`, which uses the software load +balancer, or the legacy `kafka:///` to connect to a +specific Kafka broker. ### Pig ### @@ -27,17 +37,19 @@ with the Avro schema as its first argument. You'll need to register the appropriate Kafka JARs. Here is what an example Pig script looks like: - REGISTER hadoop-kafka-bridge-0.5.2.jar; + REGISTER hadoop-producer_2.8.0-0.7.0.jar; REGISTER avro-1.4.0.jar; REGISTER piggybank.jar; - REGISTER kafka-0.5.2.jar; + REGISTER kafka-0.7.0.jar; REGISTER jackson-core-asl-1.5.5.jar; REGISTER jackson-mapper-asl-1.5.5.jar; + REGISTER zkclient-20110412.jar; + REGISTER zookeeper-3.3.4.jar; REGISTER scala-library.jar; member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray); names = FOREACH member_info GENERATE name; - STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); + STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert from Pig's data model to the specified Avro schema. @@ -46,8 +58,8 @@ multiple topics and brokers in the same job: SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000; - STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema'); - STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema'); + STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema'); + STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema'); ### KafkaOutputFormat ### @@ -126,9 +138,10 @@ docs). Default is 64*1024 (64KB). * kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer docs). Default is 1024*1024 (1MB). +* kafka.output.compression_codec: The compression codec to use (see Kafka producer + docs). Default is 0 (no compression). -For easier debugging, the above values as well as the Kafka URI -(kafka.output.url), the output server (kafka.output.server), the topic -(kafka.output.topic), and the schema (kafka.output.schema) are injected into -the job's configuration. +For easier debugging, the above values as well as the Kafka broker information +(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic), +and the schema (kafka.output.schema) are injected into the job's configuration.