diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md index 6e57fde..1bd3721 100644 --- a/contrib/hadoop-producer/README.md +++ b/contrib/hadoop-producer/README.md @@ -4,16 +4,20 @@ Hadoop to Kafka Bridge What's new? ----------- -* Now supports Kafka's software load balancer (Kafka URIs are specified with - kafka+zk as the scheme, as described below) -* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy - SyncProducer. +* Kafka 0.8 support + * No more ZK-based load balancing (backwards incompatible change) +* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a + key in the output committer of your job. The Pig StoreFunc doesn't support + semantic partitioning. +* Config parameters are now the same as the Kafka producer, just prepended with + kafka.output (e.g., kafka.output.max.message.size). This is a backwards + incompatible change. What is it? ----------- The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There -are two possible mechanisms, varying from easy to difficult: writing a Pig +are two possible mechanisms, varying from easy to difficult: writing a Pig script and writing messages in Avro format, or rolling your own job using the Kafka `OutputFormat`. @@ -25,10 +29,8 @@ multiple times in the same push. How do I use it? ---------------- -With this bridge, Kafka topics are URIs and are specified in one of two -formats: `kafka+zk://#`, which uses the software load -balancer, or the legacy `kafka:///` to connect to a -specific Kafka broker. +With this bridge, Kafka topics are URIs and are specified as URIs of the form +`kafka:///` to connect to a specific Kafka broker. ### Pig ### @@ -37,19 +39,17 @@ row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage` with the Avro schema as its first argument. You'll need to register the appropriate Kafka JARs. Here is what an example Pig script looks like: - REGISTER hadoop-producer_2.8.0-0.7.0.jar; + REGISTER hadoop-producer_2.8.0-0.8.0.jar; REGISTER avro-1.4.0.jar; REGISTER piggybank.jar; - REGISTER kafka-0.7.0.jar; + REGISTER kafka-0.8.0.jar; REGISTER jackson-core-asl-1.5.5.jar; REGISTER jackson-mapper-asl-1.5.5.jar; - REGISTER zkclient-20110412.jar; - REGISTER zookeeper-3.3.4.jar; REGISTER scala-library.jar; - member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray); + member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray); names = FOREACH member_info GENERATE name; - STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); + STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert from Pig's data model to the specified Avro schema. @@ -58,8 +58,8 @@ Further, multi-store is possible with KafkaStorage, so you can easily write to multiple topics and brokers in the same job: SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000; - STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema'); - STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema'); + STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema'); + STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema'); ### KafkaOutputFormat ### @@ -68,80 +68,27 @@ uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e., BytesWritable). This is a lower-level method of publishing data, as it allows you to precisely control output. -Here is an example that publishes some input text. With KafkaOutputFormat, the -key is a NullWritable and is ignored; only values are published. Speculative -execution is turned off by the OutputFormat. - - import kafka.bridge.hadoop.KafkaOutputFormat; - - import org.apache.hadoop.fs.Path; - import org.apache.hadoop.io.BytesWritable; - import org.apache.hadoop.io.NullWritable; - import org.apache.hadoop.io.Text; - import org.apache.hadoop.mapreduce.Job; - import org.apache.hadoop.mapreduce.Mapper; - import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - - import java.io.IOException; - - public class TextPublisher - { - public static void main(String[] args) throws Exception - { - if (args.length != 2) { - System.err.println("usage: "); - return; - } - - Job job = new Job(); - - job.setJarByClass(TextPublisher.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(BytesWritable.class); - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(KafkaOutputFormat.class); - - job.setMapperClass(TheMapper.class); - job.setNumReduceTasks(0); - - FileInputFormat.addInputPath(job, new Path(args[0])); - KafkaOutputFormat.setOutputPath(job, new Path(args[1])); - - if (!job.waitForCompletion(true)) { - throw new RuntimeException("Job failed!"); - } - } - - public static class TheMapper extends Mapper - { - @Override - protected void map(Object key, Object value, Context context) throws IOException, InterruptedException - { - context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes())); - } - } - } +Included is an example that publishes some input text line-by-line to a topic. +With KafkaOutputFormat, the key can be a null, where it is ignored by the +producer (random partitioning), or any object for semantic partitioning of the +stream (with an appropriate Kafka partitioner set). Speculative execution is +turned off by the OutputFormat. What can I tune? ---------------- -Normally, you needn't change any of these parameters: - -* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka +* kafka.output.queue.size: Bytes to queue in memory before pushing to the Kafka producer (i.e., the batch size). Default is 10*1024*1024 (10MB). -* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka - producer docs). Default is 30*1000 (30s). -* kafka.output.reconnect_timeout: Milliseconds to wait until attempting - reconnection (see Kafka producer docs). Default is 1000 (1s). -* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer - docs). Default is 64*1024 (64KB). -* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer - docs). Default is 1024*1024 (1MB). -* kafka.output.compression_codec: The compression codec to use (see Kafka producer - docs). Default is 0 (no compression). + +Any of Kafka's producer parameters can be changed by prefixing them with +"kafka.output" in one's job configuration. For example, to change the +compression codec, one would add the "kafka.output.compression.codec" parameter +(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no +compression). For easier debugging, the above values as well as the Kafka broker information -(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic), -and the schema (kafka.output.schema) are injected into the job's configuration. +(kafka.broker.list), the topic (kafka.output.topic), and the schema +(kafka.output.schema) are injected into the job's configuration. By default, +the Hadoop producer uses Kafka's sync producer as asynchronous operation +doesn't make sense in the batch Hadoop case. diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java index 5acbcee..d447b1d 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java @@ -16,18 +16,18 @@ */ package kafka.bridge.examples; - import java.io.IOException; import kafka.bridge.hadoop.KafkaOutputFormat; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +/** + * Publish a text file line by line to a Kafka topic + */ public class TextPublisher { public static void main(String[] args) throws Exception @@ -40,8 +40,6 @@ public class TextPublisher Job job = new Job(); job.setJarByClass(TextPublisher.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(KafkaOutputFormat.class); @@ -56,12 +54,12 @@ public class TextPublisher } } - public static class TheMapper extends Mapper + public static class TheMapper extends Mapper { @Override - protected void map(Object key, Object value, Context context) throws IOException, InterruptedException + protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { - context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes())); + context.write(null, value.getBytes()); } } } diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 2fd2035..aa1f944 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -16,19 +16,16 @@ */ package kafka.bridge.hadoop; - import java.io.IOException; import java.net.URI; -import java.util.Properties; +import java.util.*; import kafka.common.KafkaException; import kafka.javaapi.producer.Producer; -import kafka.message.Message; import kafka.producer.ProducerConfig; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -38,26 +35,26 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.log4j.Logger; -public class KafkaOutputFormat extends OutputFormat +public class KafkaOutputFormat extends OutputFormat { private Logger log = Logger.getLogger(KafkaOutputFormat.class); public static final String KAFKA_URL = "kafka.output.url"; - /** Bytes to buffer before the OutputFormat does a send */ + /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */ public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; - /** Default value for Kafka's connect.timeout.ms */ - public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000; - /** Default value for Kafka's reconnect.interval*/ - public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000; - /** Default value for Kafka's buffer.size */ - public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024; - /** Default value for Kafka's max.message.size */ - public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024; - /** Default value for Kafka's producer.type */ - public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync"; - /** Default value for Kafka's compression.codec */ - public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0; + public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; + private static final Map kafkaConfigMap; + static { + Map cMap = new HashMap(); + + // default Hadoop producer configs + cMap.put("producer.type", "sync"); + cMap.put("send.buffer.bytes", Integer.toString(64*1024)); + cMap.put("compression.codec", Integer.toString(1)); + + kafkaConfigMap = Collections.unmodifiableMap(cMap); + } public KafkaOutputFormat() { @@ -91,7 +88,7 @@ public class KafkaOutputFormat extends OutputFormat getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Path outputPath = getOutputPath(context); if (outputPath == null) @@ -102,58 +99,44 @@ public class KafkaOutputFormat extends OutputFormat m : job) { // handle any overrides + if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX)) + continue; + if (m.getKey().equals(KAFKA_URL)) + continue; + + String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1); + props.setProperty(kafkaKeyName, m.getValue()); // set Kafka producer property + } + + // inject Kafka producer props back into jobconf for easier debugging + for (Map.Entry m : props.entrySet()) { + job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString()); + } + + // KafkaOutputFormat specific parameters + final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE); + job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize); if (uri.getScheme().equals("kafka")) { - // using the legacy direct broker list + // using the direct broker list // URL: kafka:/// // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar - - // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique - // (KAFKA-258 will remove the broker_id requirement) - StringBuilder brokerListBuilder = new StringBuilder(); - String delim = ""; - int brokerId = 0; - for (String serverPort : uri.getAuthority().split(",")) { - brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort)); - delim = ","; - brokerId++; - } - String brokerList = brokerListBuilder.toString(); - + String brokerList = uri.getAuthority(); props.setProperty("broker.list", brokerList); - job.set("kafka.broker.list", brokerList); + job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList); if (uri.getPath() == null || uri.getPath().length() <= 1) throw new KafkaException("no topic specified in kafka uri"); - topic = uri.getPath().substring(1); // ignore the initial '/' in the path - job.set("kafka.output.topic", topic); + topic = uri.getPath().substring(1); // ignore the initial '/' in the path + job.set(KAFKA_CONFIG_PREFIX + ".topic", topic); log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic)); } else throw new KafkaException("missing scheme from kafka uri (must be kafka://)"); - Producer producer = new Producer(new ProducerConfig(props)); - return new KafkaRecordWriter(producer, topic, queueSize); + Producer producer = new Producer(new ProducerConfig(props)); + return new KafkaRecordWriter(producer, topic, queueSize); } } diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java index 8c84786..a381ccd 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java @@ -16,49 +16,58 @@ */ package kafka.bridge.hadoop; - import java.io.IOException; import java.util.LinkedList; import java.util.List; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; -import kafka.message.Message; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -public class KafkaRecordWriter extends RecordWriter +public class KafkaRecordWriter extends RecordWriter { - protected Producer producer; + protected Producer producer; protected String topic; - protected List> msgList = new LinkedList>(); + protected List> msgList = new LinkedList>(); protected int totalSize = 0; protected int queueSize; - public KafkaRecordWriter(Producer producer, String topic, int queueSize) + public KafkaRecordWriter(Producer producer, String topic, int queueSize) { this.producer = producer; this.topic = topic; this.queueSize = queueSize; } - protected void sendMsgList() + protected void sendMsgList() throws IOException { if (msgList.size() > 0) { - producer.send(msgList); + try { + producer.send(msgList); + } + catch (Exception e) { + throw new IOException(e); // all Kafka exceptions become IOExceptions + } msgList.clear(); totalSize = 0; } } @Override - public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException + public void write(K key, V value) throws IOException, InterruptedException { - Message msg = new Message(value.getBytes()); - msgList.add(new KeyedMessage(this.topic, msg)); - totalSize += msg.size(); + byte[] valBytes; + if (value instanceof byte[]) + valBytes = (byte[]) value; + else if (value instanceof BytesWritable) + valBytes = ((BytesWritable) value).getBytes(); + else + throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish"); + + msgList.add(new KeyedMessage(this.topic, key, valBytes)); + totalSize += valBytes.length; // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE) diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java index faa1950..d24a85a 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java @@ -16,7 +16,6 @@ */ package kafka.bridge.pig; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -25,8 +24,6 @@ import kafka.bridge.hadoop.KafkaRecordWriter; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.Encoder; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; @@ -38,7 +35,7 @@ import org.apache.pig.piggybank.storage.avro.PigSchema2Avro; public class AvroKafkaStorage extends StoreFunc { - protected KafkaRecordWriter writer; + protected KafkaRecordWriter writer; protected org.apache.avro.Schema avroSchema; protected PigAvroDatumWriter datumWriter; protected Encoder encoder; @@ -68,6 +65,7 @@ public class AvroKafkaStorage extends StoreFunc } @Override + @SuppressWarnings("unchecked") public void prepareToWrite(RecordWriter writer) throws IOException { if (this.avroSchema == null) @@ -108,7 +106,7 @@ public class AvroKafkaStorage extends StoreFunc this.encoder.flush(); try { - this.writer.write(NullWritable.get(), new BytesWritable(this.os.toByteArray())); + this.writer.write(null, this.os.toByteArray()); } catch (InterruptedException e) { throw new IOException(e); diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index 48d1930..fac723a 100644 --- a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -239,7 +239,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje - +