Index: tools/consumer/consumer.go =================================================================== --- tools/consumer/consumer.go (revision 1186859) +++ tools/consumer/consumer.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package main import ( @@ -53,7 +52,6 @@ flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") } - func main() { flag.Parse() fmt.Println("Consuming Messages :") @@ -87,7 +85,7 @@ go func() { for { sig := <-signal.Incoming - if sig.(signal.UnixSignal) == syscall.SIGINT { + if sig.(os.UnixSignal) == syscall.SIGINT { quit <- true } } Index: tools/publisher/publisher.go =================================================================== --- tools/publisher/publisher.go (revision 1186859) +++ tools/publisher/publisher.go (working copy) @@ -34,6 +34,7 @@ var partition int var message string var messageFile string +var compress bool func init() { flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") @@ -41,6 +42,7 @@ flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.StringVar(&message, "message", "", "message to publish") flag.StringVar(&messageFile, "messagefile", "", "read message from this file") + flag.BoolVar(&compress, "compress", false, "compress the messages published") } func main() { @@ -64,12 +66,24 @@ payload := make([]byte, stat.Size) file.Read(payload) timing := kafka.StartTiming("Sending") - broker.Publish(kafka.NewMessage(payload)) + + if compress { + broker.Publish(kafka.NewCompressedMessage(payload)) + } else { + broker.Publish(kafka.NewMessage(payload)) + } + timing.Print() file.Close() } else { timing := kafka.StartTiming("Sending") - broker.Publish(kafka.NewMessage([]byte(message))) + + if compress { + broker.Publish(kafka.NewCompressedMessage([]byte(message))) + } else { + broker.Publish(kafka.NewMessage([]byte(message))) + } + timing.Print() } } Index: kafka_test.go =================================================================== --- kafka_test.go (revision 1186859) +++ kafka_test.go (working copy) @@ -20,20 +20,19 @@ * of their respective owners. */ - package kafka import ( "testing" //"fmt" "bytes" - "container/list" + "compress/gzip" ) func TestMessageCreation(t *testing.T) { payload := []byte("testing") msg := NewMessage(payload) - if msg.magic != 0 { + if msg.magic != 1 { t.Errorf("magic incorrect") t.Fail() } @@ -45,34 +44,186 @@ } } +func TestMagic0MessageEncoding(t *testing.T) { + // generated by kafka-rb: + // test the old message format + expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + length, msgsDecoded := Decode(expected, DefaultCodecsMap) + if length == 0 || msgsDecoded == nil { + t.Fail() + } + msgDecoded := msgsDecoded[0] + + payload := []byte("testing") + if !bytes.Equal(payload, msgDecoded.payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 0 { + t.Fatal("magic incorrect") + } +} + func TestMessageEncoding(t *testing.T) { + payload := []byte("testing") msg := NewMessage(payload) // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} if !bytes.Equal(expected, msg.Encode()) { - t.Fail() + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) } // verify round trip - msgDecoded := Decode(msg.Encode()) + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + if !bytes.Equal(msgDecoded.payload, payload) { - t.Fail() + t.Fatal("bytes not equal") } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestCompressedMessageEncodingCompare(t *testing.T) { + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() + msgDefaultBytes := NewCompressedMessage(payload).Encode() + if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) + } +} + +func TestCompressedMessageEncoding(t *testing.T) { + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + + expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, + 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, + 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, + 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} + + expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} + + expected := make([]byte, len(expectedHeader)+len(expectedPayload)) + n := copy(expected, expectedHeader) + copy(expected[n:], expectedPayload) + + if msg.compression != 1 { + t.Fatalf("expected compression: 1 but got: %b", msg.compression) + } + + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 100) + n, _ = zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() + + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) + } + + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } + + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + if !bytes.Equal(msgDecoded.payload, payload) { - t.Fail() + t.Fatal("bytes not equal") } chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(msgDecoded.checksum[:], chksum) { - t.Fail() + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatalf("checksums do not match, expected: % X but was: % X", + chksum, msgDecoded.checksum[:]) } - if msgDecoded.magic != 0 { - t.Fail() + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") } } +func TestLongCompressedMessageRoundTrip(t *testing.T) { + payloadBuf := bytes.NewBuffer([]byte{}) + // make the test bigger than buffer allocated in the Decode + for i := 0; i < 15; i++ { + payloadBuf.Write([]byte("testing123 ")) + } + + uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 200) + n, _ := zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() + + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", + uncompressed, uncompressedMsgBytes) + } + + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { + t.Fatal("bytes not equal") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestMultipleCompressedMessages(t *testing.T) { + msgs := []*Message{NewMessage([]byte("testing")), + NewMessage([]byte("multiple")), + NewMessage([]byte("messages")), + } + msg := NewCompressedMessages(msgs...) + + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + if length == 0 || msgsDecoded == nil { + t.Fatal("msgsDecoded is nil") + } + + // make sure the decompressed messages match what was put in + for index, decodedMsg := range msgsDecoded { + if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { + t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", + msgs[index].payload, decodedMsg.payload) + } + } +} + func TestRequestHeaderEncoding(t *testing.T) { broker := newBroker("localhost:9092", "test", 0) request := broker.EncodeRequestHeader(REQUEST_PRODUCE) @@ -88,24 +239,22 @@ } } - func TestPublishRequestEncoding(t *testing.T) { payload := []byte("testing") msg := NewMessage(payload) - messages := list.New() - messages.PushBack(msg) pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodePublishRequest(messages) + request := pubBroker.broker.EncodePublishRequest(msg) // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x0c, - 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, + /* magic comp ...... chksum .... .. payload .. */ + 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} if !bytes.Equal(expected, request) { t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: %X\n but got: %X", expected, request) + t.Errorf("expected: % X\n but got: % X", expected, request) t.Fail() } } @@ -122,7 +271,7 @@ if !bytes.Equal(expected, request) { t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: %X\n but got: %X", expected, request) + t.Errorf("expected: % X\n but got: % X", expected, request) t.Fail() } } Index: src/consumer.go =================================================================== --- src/consumer.go (revision 1186859) +++ src/consumer.go (working copy) @@ -34,6 +34,7 @@ broker *Broker offset uint64 maxSize uint32 + codecs map[byte]PayloadCodec } // Create a new broker consumer @@ -45,7 +46,8 @@ func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer { return &BrokerConsumer{broker: newBroker(hostname, topic, partition), offset: offset, - maxSize: maxSize} + maxSize: maxSize, + codecs: DefaultCodecsMap} } // Simplified consumer that defaults the offset and maxSize to 0. @@ -55,9 +57,18 @@ func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer { return &BrokerConsumer{broker: newBroker(hostname, topic, partition), offset: 0, - maxSize: 0} + maxSize: 0, + codecs: DefaultCodecsMap} } +// Add Custom Payload Codecs for Consumer Decoding +// payloadCodecs - an array of PayloadCodec implementations +func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { + // merge to the default map, so one 'could' override the default codecs.. + for k, v := range codecsMap(payloadCodecs) { + consumer.codecs[k] = v, true + } +} func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) { conn, err := consumer.broker.connect() @@ -77,14 +88,15 @@ if err != nil { if err != os.EOF { log.Println("Fatal Error: ", err) + panic(err) } + quit <- true // force quit break } time.Sleep(pollTimeoutMs * 1000000) } done <- true }() - // wait to be told to stop.. <-quit conn.Close() @@ -111,7 +123,6 @@ return num, err } - func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) { _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) if err != nil { @@ -129,14 +140,19 @@ // parse out the messages var currentOffset uint64 = 0 for currentOffset <= uint64(length-4) { - msg := Decode(payload[currentOffset:]) - if msg == nil { + totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) + if msgs == nil { return num, os.NewError("Error Decoding Message") } - msg.offset = consumer.offset + currentOffset - currentOffset += uint64(4 + msg.totalLength) - handlerFunc(msg) - num += 1 + msgOffset := consumer.offset + currentOffset + for _, msg := range msgs { + // update all of the messages offset + // multiple messages can be at the same offset (compressed for example) + msg.offset = msgOffset + handlerFunc(&msg) + num += 1 + } + currentOffset += uint64(4 + totalLength) } // update the broker's offset for next consumption consumer.offset += currentOffset @@ -145,7 +161,6 @@ return num, err } - // Get a list of valid offsets (up to maxNumOffsets) before the given time, where // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) // The result is a list of offsets, in descending order. Index: src/kafka.go =================================================================== --- src/kafka.go (revision 1186859) +++ src/kafka.go (working copy) @@ -28,17 +28,14 @@ "os" "fmt" "encoding/binary" - "strconv" "io" "bufio" ) const ( - MAGIC_DEFAULT = 0 - NETWORK = "tcp" + NETWORK = "tcp" ) - type Broker struct { topic string partition int @@ -51,7 +48,6 @@ hostname: hostname} } - func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { @@ -91,7 +87,9 @@ errorCode := binary.BigEndian.Uint16(messages[0:2]) if errorCode != 0 { - return 0, []byte{}, os.NewError(strconv.Uitoa(uint(errorCode))) + log.Println("errorCode: ", errorCode) + return 0, []byte{}, os.NewError( + fmt.Sprintf("Broker Response Error: %d", errorCode)) } return expectedLength, messages[2:], nil } Index: src/publisher.go =================================================================== --- src/publisher.go (revision 1186859) +++ src/publisher.go (working copy) @@ -23,11 +23,9 @@ package kafka import ( - "container/list" "os" ) - type BrokerPublisher struct { broker *Broker } @@ -36,21 +34,19 @@ return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} } - func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) { - messages := list.New() - messages.PushBack(message) - return b.BatchPublish(messages) + return b.BatchPublish(message) } -func (b *BrokerPublisher) BatchPublish(messages *list.List) (int, os.Error) { +func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) { conn, err := b.broker.connect() if err != nil { return -1, err } defer conn.Close() // TODO: MULTIPRODUCE - num, err := conn.Write(b.broker.EncodePublishRequest(messages)) + request := b.broker.EncodePublishRequest(messages...) + num, err := conn.Write(request) if err != nil { return -1, err } Index: src/converts.go =================================================================== --- src/converts.go (revision 1186859) +++ src/converts.go (working copy) @@ -22,12 +22,10 @@ package kafka - import ( "encoding/binary" ) - func uint16bytes(value int) []byte { result := make([]byte, 2) binary.BigEndian.PutUint16(result, uint16(value)) Index: src/message.go =================================================================== --- src/message.go (revision 1186859) +++ src/message.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package kafka import ( @@ -30,13 +29,21 @@ "log" ) +const ( + // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression + MAGIC_DEFAULT = 1 + // magic + compression + chksum + NO_LEN_HEADER_SIZE = 1 + 1 + 4 +) type Message struct { magic byte + compression byte checksum [4]byte payload []byte offset uint64 // only used after decoding - totalLength uint32 // total length of the message (decoding) + totalLength uint32 // total length of the raw message (from decoding) + } func (m *Message) Offset() uint64 { @@ -51,57 +58,125 @@ return string(m.payload) } -func NewMessage(payload []byte) *Message { +func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message { message := &Message{} message.magic = byte(MAGIC_DEFAULT) - binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(payload)) - message.payload = payload + message.compression = codec.Id() + message.payload = codec.Encode(payload) + binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload)) return message } -// MESSAGE SET: +// Default is is create a message with no compression +func NewMessage(payload []byte) *Message { + return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) +} + +// Create a Message using the default compression method (gzip) +func NewCompressedMessage(payload []byte) *Message { + return NewCompressedMessages(NewMessage(payload)) +} + +func NewCompressedMessages(messages ...*Message) *Message { + buf := bytes.NewBuffer([]byte{}) + for _, message := range messages { + buf.Write(message.Encode()) + } + return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID]) +} + +// MESSAGE SET: func (m *Message) Encode() []byte { - msgLen := 1 + 4 + len(m.payload) + msgLen := NO_LEN_HEADER_SIZE + len(m.payload) msg := make([]byte, 4+msgLen) binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) msg[4] = m.magic - copy(msg[5:], m.checksum[0:]) - copy(msg[9:], m.payload) + msg[5] = m.compression + + copy(msg[6:], m.checksum[0:]) + copy(msg[10:], m.payload) + return msg } -func Decode(packet []byte) *Message { +func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { + return Decode(packet, DefaultCodecsMap) +} + +func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { + messages := []Message{} + + length, message := decodeMessage(packet, payloadCodecsMap) + + if length > 0 && message != nil { + if message.compression != NO_COMPRESSION_ID { + // wonky special case for compressed messages having embedded messages + payloadLen := uint32(len(message.payload)) + messageLenLeft := payloadLen + for messageLenLeft > 0 { + start := payloadLen - messageLenLeft + innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) + messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 + messages = append(messages, *innerMsg) + } + } else { + messages = append(messages, *message) + } + } + + return length, messages +} + +func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { length := binary.BigEndian.Uint32(packet[0:]) if length > uint32(len(packet[4:])) { log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) - return nil + return 0, nil } msg := Message{} msg.totalLength = length msg.magic = packet[4] - copy(msg.checksum[:], packet[5:9]) - payloadLength := length - 1 - 4 - msg.payload = packet[9 : 9+payloadLength] + rawPayload := []byte{} + if msg.magic == 0 { + msg.compression = byte(0) + copy(msg.checksum[:], packet[5:9]) + payloadLength := length - 1 - 4 + rawPayload = packet[9 : 9+payloadLength] + } else if msg.magic == MAGIC_DEFAULT { + msg.compression = packet[5] + copy(msg.checksum[:], packet[6:10]) + payloadLength := length - NO_LEN_HEADER_SIZE + rawPayload = packet[10 : 10+payloadLength] + } else { + log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) + return 0, nil + } + payloadChecksum := make([]byte, 4) - binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(msg.payload)) + binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload)) if !bytes.Equal(payloadChecksum, msg.checksum[:]) { - log.Printf("checksum mismatch, expected: %X was: %X\n", payloadChecksum, msg.checksum[:]) - return nil + msg.Print() + log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) + return 0, nil } - return &msg + msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) + + return length, &msg } func (msg *Message) Print() { log.Println("----- Begin Message ------") log.Printf("magic: %X\n", msg.magic) + log.Printf("compression: %X\n", msg.compression) log.Printf("checksum: %X\n", msg.checksum) if len(msg.payload) < 1048576 { // 1 MB - log.Printf("payload: %X\n", msg.payload) + log.Printf("payload: % X\n", msg.payload) log.Printf("payload(string): %s\n", msg.PayloadString()) } else { log.Printf("long payload, length: %d\n", len(msg.payload)) } + log.Printf("length: %d\n", msg.totalLength) log.Printf("offset: %d\n", msg.offset) log.Println("----- End Message ------") } Index: src/payload_codec.go =================================================================== --- src/payload_codec.go (revision 0) +++ src/payload_codec.go (revision 0) @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bytes" + "compress/gzip" + // "log" +) + +const ( + NO_COMPRESSION_ID = 0 + GZIP_COMPRESSION_ID = 1 +) + +type PayloadCodec interface { + + // the 1 byte id of the codec + Id() byte + + // encoder interface for compression implementation + Encode(data []byte) []byte + + // decoder interface for decompression implementation + Decode(data []byte) []byte +} + +// Default Codecs + +var DefaultCodecs = []PayloadCodec{ + new(NoCompressionPayloadCodec), + new(GzipPayloadCodec), +} + +var DefaultCodecsMap = codecsMap(DefaultCodecs) + +func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { + payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) + for _, c := range payloadCodecs { + payloadCodecsMap[c.Id()] = c, true + } + return payloadCodecsMap +} + +// No compression codec, noop + +type NoCompressionPayloadCodec struct { + +} + +func (codec *NoCompressionPayloadCodec) Id() byte { + return NO_COMPRESSION_ID +} + +func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte { + return data +} + +func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte { + return data +} + +// Gzip Codec + +type GzipPayloadCodec struct { + +} + +func (codec *GzipPayloadCodec) Id() byte { + return GZIP_COMPRESSION_ID +} + +func (codec *GzipPayloadCodec) Encode(data []byte) []byte { + buf := bytes.NewBuffer([]byte{}) + zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed) + zipper.Write(data) + zipper.Close() + return buf.Bytes() +} + +func (codec *GzipPayloadCodec) Decode(data []byte) []byte { + buf := bytes.NewBuffer([]byte{}) + zipper, _ := gzip.NewReader(bytes.NewBuffer(data)) + unzipped := make([]byte, 100) + for { + n, err := zipper.Read(unzipped) + if n > 0 && err == nil { + buf.Write(unzipped[0:n]) + } else { + break + } + } + + zipper.Close() + return buf.Bytes() +} Index: src/timing.go =================================================================== --- src/timing.go (revision 1186859) +++ src/timing.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package kafka import ( Index: src/request.go =================================================================== --- src/request.go (revision 1186859) +++ src/request.go (working copy) @@ -25,22 +25,19 @@ import ( "encoding/binary" "bytes" - "container/list" ) - type RequestType uint16 // Request Types const ( REQUEST_PRODUCE RequestType = 0 - REQUEST_FETCH = 1 - REQUEST_MULTIFETCH = 2 - REQUEST_MULTIPRODUCE = 3 - REQUEST_OFFSETS = 4 + REQUEST_FETCH = 1 + REQUEST_MULTIFETCH = 2 + REQUEST_MULTIPRODUCE = 3 + REQUEST_OFFSETS = 4 ) - // Request Header: func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer { request := bytes.NewBuffer([]byte{}) @@ -70,7 +67,6 @@ return request.Bytes() } - // func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { request := b.EncodeRequestHeader(REQUEST_FETCH) @@ -83,9 +79,8 @@ return request.Bytes() } - // -func (b *Broker) EncodePublishRequest(messages *list.List) []byte { +func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { // 4 + 2 + 2 + topicLength + 4 + 4 request := b.EncodeRequestHeader(REQUEST_PRODUCE) @@ -93,8 +88,7 @@ request.Write(uint32bytes(0)) // placeholder message len written := 0 - for element := messages.Front(); element != nil; element = element.Next() { - message := element.Value.(*Message) + for _, message := range messages { wrote, _ := request.Write(message.Encode()) written += wrote } @@ -103,6 +97,5 @@ binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) // now add the size of the whole to the first uint32 encodeRequestSize(request) - return request.Bytes() } Index: README.md =================================================================== --- README.md (revision 1186859) +++ README.md (working copy) @@ -48,6 +48,17 @@ + +### Publishing Compressed Messages ### + +

+
+broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
+broker.Publish(kafka.NewCompressedMessage([]byte("tesing 1 2 3")))
+
+
+ + ### Consumer ###

Index: Makefile
===================================================================
--- Makefile	(revision 1186859)
+++ Makefile	(working copy)
@@ -6,6 +6,7 @@
 	src/message.go\
 	src/converts.go\
 	src/consumer.go\
+	src/payload_codec.go\
 	src/publisher.go\
 	src/timing.go\
 	src/request.go\