Index: clients/go/tools/offsets/offsets.go =================================================================== --- clients/go/tools/offsets/offsets.go (revision 1298531) +++ clients/go/tools/offsets/offsets.go (working copy) @@ -20,13 +20,12 @@ * of their respective owners. */ - package main import ( - "kafka" - "flag" - "fmt" + "flag" + "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" ) var hostname string @@ -36,27 +35,26 @@ var time int64 func init() { - flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") - flag.StringVar(&topic, "topic", "test", "topic to read offsets from") - flag.IntVar(&partition, "partition", 0, "partition to read offsets from") - flag.UintVar(&offsets, "offsets", 1, "number of offsets returned") - flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)") + flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") + flag.StringVar(&topic, "topic", "test", "topic to read offsets from") + flag.IntVar(&partition, "partition", 0, "partition to read offsets from") + flag.UintVar(&offsets, "offsets", 1, "number of offsets returned") + flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)") } - func main() { - flag.Parse() - fmt.Println("Offsets :") - fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) - fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerOffsetConsumer(hostname, topic, partition) + flag.Parse() + fmt.Println("Offsets :") + fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Println(" ---------------------- ") + broker := kafka.NewBrokerOffsetConsumer(hostname, topic, partition) - offsets, err := broker.GetOffsets(time, uint32(offsets)) - if err != nil { - fmt.Println("Error: ", err) - } - fmt.Printf("Offsets found: %d\n", len(offsets)) - for i := 0 ; i < len(offsets); i++ { - fmt.Printf("Offset[%d] = %d\n", i, offsets[i]) - } + offsets, err := broker.GetOffsets(time, uint32(offsets)) + if err != nil { + fmt.Println("Error: ", err) + } + fmt.Printf("Offsets found: %d\n", len(offsets)) + for i := 0; i < len(offsets); i++ { + fmt.Printf("Offset[%d] = %d\n", i, offsets[i]) + } } Index: clients/go/tools/consumer/consumer.go =================================================================== --- clients/go/tools/consumer/consumer.go (revision 1298531) +++ clients/go/tools/consumer/consumer.go (working copy) @@ -23,13 +23,13 @@ package main import ( - "kafka" - "flag" - "fmt" - "os" - "strconv" - "os/signal" - "syscall" + "flag" + "fmt" + "os" + "os/signal" + "strconv" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" + "syscall" ) var hostname string @@ -42,70 +42,77 @@ var printmessage bool func init() { - flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") - flag.StringVar(&topic, "topic", "test", "topic to publish to") - flag.IntVar(&partition, "partition", 0, "partition to publish to") - flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") - flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from") - flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") - flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") - flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") + flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") + flag.StringVar(&topic, "topic", "test", "topic to publish to") + flag.IntVar(&partition, "partition", 0, "partition to publish to") + flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") + flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from") + flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") + flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") + flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") } func main() { - flag.Parse() - fmt.Println("Consuming Messages :") - fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) - fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) + flag.Parse() + fmt.Println("Consuming Messages :") + fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Println(" ---------------------- ") + broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) - var payloadFile *os.File = nil - if len(writePayloadsTo) > 0 { - var err os.Error - payloadFile, err = os.Create(writePayloadsTo) - if err != nil { - fmt.Println("Error opening file: ", err) - payloadFile = nil - } - } + var payloadFile *os.File = nil + if len(writePayloadsTo) > 0 { + var err error + payloadFile, err = os.Create(writePayloadsTo) + if err != nil { + fmt.Println("Error opening file: ", err) + payloadFile = nil + } + } - consumerCallback := func(msg *kafka.Message) { - if printmessage { - msg.Print() - } - if payloadFile != nil { - payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n")) - payloadFile.Write(msg.Payload()) - payloadFile.Write([]byte("\n-------------------------------\n")) - } - } + consumerCallback := func(msg *kafka.Message) { + if printmessage { + msg.Print() + } + if payloadFile != nil { + payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) + payloadFile.Write(msg.Payload()) + payloadFile.Write([]byte("\n-------------------------------\n")) + } + } - if consumerForever { - quit := make(chan bool, 1) - go func() { - for { - sig := <-signal.Incoming - if sig.(os.UnixSignal) == syscall.SIGINT { - quit <- true - } - } - }() + if consumerForever { + quit := make(chan bool, 1) + go func() { + var sigIn chan os.Signal + signal.Notify(sigIn) + for { - msgChan := make(chan *kafka.Message) - go broker.ConsumeOnChannel(msgChan, 10, quit) - for msg := range msgChan { - if msg != nil { - consumerCallback(msg) - } else { - break - } - } - } else { - broker.Consume(consumerCallback) - } + select { + case sig := <-sigIn: + if sig.(os.Signal) == syscall.SIGINT { + quit <- true + } else { + fmt.Println(sig) + } + } + } + }() - if payloadFile != nil { - payloadFile.Close() - } + msgChan := make(chan *kafka.Message) + go broker.ConsumeOnChannel(msgChan, 10, quit) + for msg := range msgChan { + if msg != nil { + consumerCallback(msg) + } else { + break + } + } + } else { + broker.Consume(consumerCallback) + } + if payloadFile != nil { + payloadFile.Close() + } + } Index: clients/go/tools/consumer/Makefile =================================================================== --- clients/go/tools/consumer/Makefile (revision 1298531) +++ clients/go/tools/consumer/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=consumer -GOFILES=\ - consumer.go\ - -include $(GOROOT)/src/Make.cmd Index: clients/go/tools/publisher/publisher.go =================================================================== --- clients/go/tools/publisher/publisher.go (revision 1298531) +++ clients/go/tools/publisher/publisher.go (working copy) @@ -23,10 +23,10 @@ package main import ( - "kafka" - "flag" - "fmt" - "os" + "flag" + "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" + "os" ) var hostname string @@ -37,53 +37,53 @@ var compress bool func init() { - flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") - flag.StringVar(&topic, "topic", "test", "topic to publish to") - flag.IntVar(&partition, "partition", 0, "partition to publish to") - flag.StringVar(&message, "message", "", "message to publish") - flag.StringVar(&messageFile, "messagefile", "", "read message from this file") - flag.BoolVar(&compress, "compress", false, "compress the messages published") + flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") + flag.StringVar(&topic, "topic", "test", "topic to publish to") + flag.IntVar(&partition, "partition", 0, "partition to publish to") + flag.StringVar(&message, "message", "", "message to publish") + flag.StringVar(&messageFile, "messagefile", "", "read message from this file") + flag.BoolVar(&compress, "compress", false, "compress the messages published") } func main() { - flag.Parse() - fmt.Println("Publishing :", message) - fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition) - fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerPublisher(hostname, topic, partition) + flag.Parse() + fmt.Println("Publishing :", message) + fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Println(" ---------------------- ") + broker := kafka.NewBrokerPublisher(hostname, topic, partition) - if len(message) == 0 && len(messageFile) != 0 { - file, err := os.Open(messageFile) - if err != nil { - fmt.Println("Error: ", err) - return - } - stat, err := file.Stat() - if err != nil { - fmt.Println("Error: ", err) - return - } - payload := make([]byte, stat.Size) - file.Read(payload) - timing := kafka.StartTiming("Sending") + if len(message) == 0 && len(messageFile) != 0 { + file, err := os.Open(messageFile) + if err != nil { + fmt.Println("Error: ", err) + return + } + stat, err := file.Stat() + if err != nil { + fmt.Println("Error: ", err) + return + } + payload := make([]byte, stat.Size()) + file.Read(payload) + timing := kafka.StartTiming("Sending") - if compress { - broker.Publish(kafka.NewCompressedMessage(payload)) - } else { - broker.Publish(kafka.NewMessage(payload)) - } + if compress { + broker.Publish(kafka.NewCompressedMessage(payload)) + } else { + broker.Publish(kafka.NewMessage(payload)) + } - timing.Print() - file.Close() - } else { - timing := kafka.StartTiming("Sending") + timing.Print() + file.Close() + } else { + timing := kafka.StartTiming("Sending") - if compress { - broker.Publish(kafka.NewCompressedMessage([]byte(message))) - } else { - broker.Publish(kafka.NewMessage([]byte(message))) - } + if compress { + broker.Publish(kafka.NewCompressedMessage([]byte(message))) + } else { + broker.Publish(kafka.NewMessage([]byte(message))) + } - timing.Print() - } + timing.Print() + } } Index: clients/go/kafka_test.go =================================================================== --- clients/go/kafka_test.go (revision 1298531) +++ clients/go/kafka_test.go (working copy) @@ -1,277 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "testing" - //"fmt" - "bytes" - "compress/gzip" -) - -func TestMessageCreation(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - if msg.magic != 1 { - t.Errorf("magic incorrect") - t.Fail() - } - - // generated by kafka-rb: e8 f3 5a 06 - expected := []byte{0xe8, 0xf3, 0x5a, 0x06} - if !bytes.Equal(expected, msg.checksum[:]) { - t.Fail() - } -} - -func TestMagic0MessageEncoding(t *testing.T) { - // generated by kafka-rb: - // test the old message format - expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - length, msgsDecoded := Decode(expected, DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fail() - } - msgDecoded := msgsDecoded[0] - - payload := []byte("testing") - if !bytes.Equal(payload, msgDecoded.payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 0 { - t.Fatal("magic incorrect") - } -} - -func TestMessageEncoding(t *testing.T) { - - payload := []byte("testing") - msg := NewMessage(payload) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } - - // verify round trip - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestCompressedMessageEncodingCompare(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() - msgDefaultBytes := NewCompressedMessage(payload).Encode() - if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) - } -} - -func TestCompressedMessageEncoding(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, - 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, - 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, - 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} - - expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} - - expected := make([]byte, len(expectedHeader)+len(expectedPayload)) - n := copy(expected, expectedHeader) - copy(expected[n:], expectedPayload) - - if msg.compression != 1 { - t.Fatalf("expected compression: 1 but got: %b", msg.compression) - } - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 100) - n, _ = zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() - - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) - } - - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } - - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatalf("checksums do not match, expected: % X but was: % X", - chksum, msgDecoded.checksum[:]) - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestLongCompressedMessageRoundTrip(t *testing.T) { - payloadBuf := bytes.NewBuffer([]byte{}) - // make the test bigger than buffer allocated in the Decode - for i := 0; i < 15; i++ { - payloadBuf.Write([]byte("testing123 ")) - } - - uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 200) - n, _ := zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() - - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", - uncompressed, uncompressedMsgBytes) - } - - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { - t.Fatal("bytes not equal") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestMultipleCompressedMessages(t *testing.T) { - msgs := []*Message{NewMessage([]byte("testing")), - NewMessage([]byte("multiple")), - NewMessage([]byte("messages")), - } - msg := NewCompressedMessages(msgs...) - - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - if length == 0 || msgsDecoded == nil { - t.Fatal("msgsDecoded is nil") - } - - // make sure the decompressed messages match what was put in - for index, decodedMsg := range msgsDecoded { - if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { - t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", - msgs[index].payload, decodedMsg.payload) - } - } -} - -func TestRequestHeaderEncoding(t *testing.T) { - broker := newBroker("localhost:9092", "test", 0) - request := broker.EncodeRequestHeader(REQUEST_PRODUCE) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00} - - if !bytes.Equal(expected, request.Bytes()) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) - t.Errorf("expected: %X\n but got: %X", expected, request) - t.Fail() - } -} - -func TestPublishRequestEncoding(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodePublishRequest(msg) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, - /* magic comp ...... chksum .... .. payload .. */ - 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } -} - -func TestConsumeRequestEncoding(t *testing.T) { - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) - - // generated by kafka-rb, encode_request_size + encode_request - expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, - 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} - - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } -} Index: clients/go/src/consumer.go =================================================================== --- clients/go/src/consumer.go (revision 1298586) +++ clients/go/src/consumer.go (working copy) @@ -23,18 +23,19 @@ package kafka import ( - "log" - "os" - "net" - "time" - "encoding/binary" + "encoding/binary" + "errors" + "io" + "log" + "net" + "time" ) type BrokerConsumer struct { - broker *Broker - offset uint64 - maxSize uint32 - codecs map[byte]PayloadCodec + broker *Broker + offset uint64 + maxSize uint32 + codecs map[byte]PayloadCodec } // Create a new broker consumer @@ -44,10 +45,10 @@ // offset to start consuming from // maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published) func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer { - return &BrokerConsumer{broker: newBroker(hostname, topic, partition), - offset: offset, - maxSize: maxSize, - codecs: DefaultCodecsMap} + return &BrokerConsumer{broker: newBroker(hostname, topic, partition), + offset: offset, + maxSize: maxSize, + codecs: DefaultCodecsMap} } // Simplified consumer that defaults the offset and maxSize to 0. @@ -55,145 +56,145 @@ // topic to consume // partition to consume from func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer { - return &BrokerConsumer{broker: newBroker(hostname, topic, partition), - offset: 0, - maxSize: 0, - codecs: DefaultCodecsMap} + return &BrokerConsumer{broker: newBroker(hostname, topic, partition), + offset: 0, + maxSize: 0, + codecs: DefaultCodecsMap} } // Add Custom Payload Codecs for Consumer Decoding // payloadCodecs - an array of PayloadCodec implementations func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { - // merge to the default map, so one 'could' override the default codecs.. - for k, v := range codecsMap(payloadCodecs) { - consumer.codecs[k] = v, true - } + // merge to the default map, so one 'could' override the default codecs.. + for k, v := range codecsMap(payloadCodecs) { + consumer.codecs[k] = v + } } -func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) { - conn, err := consumer.broker.connect() - if err != nil { - return -1, err - } +func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) { + conn, err := consumer.broker.connect() + if err != nil { + return -1, err + } - num := 0 - done := make(chan bool, 1) - go func() { - for { - _, err := consumer.consumeWithConn(conn, func(msg *Message) { - msgChan <- msg - num += 1 - }) + num := 0 + done := make(chan bool, 1) + go func() { + for { + _, err := consumer.consumeWithConn(conn, func(msg *Message) { + msgChan <- msg + num += 1 + }) - if err != nil { - if err != os.EOF { - log.Println("Fatal Error: ", err) - panic(err) - } - quit <- true // force quit - break - } - time.Sleep(pollTimeoutMs * 1000000) - } - done <- true - }() - // wait to be told to stop.. - <-quit - conn.Close() - close(msgChan) - <-done - return num, err + if err != nil { + if err != io.EOF { + log.Println("Fatal Error: ", err) + panic(err) + } + quit <- true // force quit + break + } + time.Sleep(time.Duration(pollTimeoutMs) * time.Millisecond * 1000000) + } + done <- true + }() + // wait to be told to stop.. + <-quit + conn.Close() + close(msgChan) + <-done + return num, err } type MessageHandlerFunc func(msg *Message) -func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) { - conn, err := consumer.broker.connect() - if err != nil { - return -1, err - } - defer conn.Close() +func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) { + conn, err := consumer.broker.connect() + if err != nil { + return -1, err + } + defer conn.Close() - num, err := consumer.consumeWithConn(conn, handlerFunc) + num, err := consumer.consumeWithConn(conn, handlerFunc) - if err != nil { - log.Println("Fatal Error: ", err) - } + if err != nil { + log.Println("Fatal Error: ", err) + } - return num, err + return num, err } -func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) { - _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) - if err != nil { - return -1, err - } +func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { + _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) + if err != nil { + return -1, err + } - length, payload, err := consumer.broker.readResponse(conn) + length, payload, err := consumer.broker.readResponse(conn) - if err != nil { - return -1, err - } + if err != nil { + return -1, err + } - num := 0 - if length > 2 { - // parse out the messages - var currentOffset uint64 = 0 - for currentOffset <= uint64(length-4) { - totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) - if msgs == nil { - return num, os.NewError("Error Decoding Message") - } - msgOffset := consumer.offset + currentOffset - for _, msg := range msgs { - // update all of the messages offset - // multiple messages can be at the same offset (compressed for example) - msg.offset = msgOffset - handlerFunc(&msg) - num += 1 - } - currentOffset += uint64(4 + totalLength) - } - // update the broker's offset for next consumption - consumer.offset += currentOffset - } + num := 0 + if length > 2 { + // parse out the messages + var currentOffset uint64 = 0 + for currentOffset <= uint64(length-4) { + totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) + if msgs == nil { + return num, errors.New("Error Decoding Message") + } + msgOffset := consumer.offset + currentOffset + for _, msg := range msgs { + // update all of the messages offset + // multiple messages can be at the same offset (compressed for example) + msg.offset = msgOffset + handlerFunc(&msg) + num += 1 + } + currentOffset += uint64(4 + totalLength) + } + // update the broker's offset for next consumption + consumer.offset += currentOffset + } - return num, err + return num, err } // Get a list of valid offsets (up to maxNumOffsets) before the given time, where // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) // The result is a list of offsets, in descending order. -func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) { - offsets := make([]uint64, 0) +func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) { + offsets := make([]uint64, 0) - conn, err := consumer.broker.connect() - if err != nil { - return offsets, err - } + conn, err := consumer.broker.connect() + if err != nil { + return offsets, err + } - defer conn.Close() + defer conn.Close() - _, err = conn.Write(consumer.broker.EncodeOffsetRequest(time, maxNumOffsets)) - if err != nil { - return offsets, err - } + _, err = conn.Write(consumer.broker.EncodeOffsetRequest(time, maxNumOffsets)) + if err != nil { + return offsets, err + } - length, payload, err := consumer.broker.readResponse(conn) - if err != nil { - return offsets, err - } + length, payload, err := consumer.broker.readResponse(conn) + if err != nil { + return offsets, err + } - if length > 4 { - // get the number of offsets - numOffsets := binary.BigEndian.Uint32(payload[0:]) - var currentOffset uint64 = 4 - for currentOffset < uint64(length-4) && uint32(len(offsets)) < numOffsets { - offset := binary.BigEndian.Uint64(payload[currentOffset:]) - offsets = append(offsets, offset) - currentOffset += 8 // offset size - } - } + if length > 4 { + // get the number of offsets + numOffsets := binary.BigEndian.Uint32(payload[0:]) + var currentOffset uint64 = 4 + for currentOffset < uint64(length-4) && uint32(len(offsets)) < numOffsets { + offset := binary.BigEndian.Uint64(payload[currentOffset:]) + offsets = append(offsets, offset) + currentOffset += 8 // offset size + } + } - return offsets, err + return offsets, err } Index: clients/go/src/kafka_test.go =================================================================== --- clients/go/src/kafka_test.go (revision 1298531) +++ clients/go/src/kafka_test.go (working copy) @@ -23,255 +23,255 @@ package kafka import ( - "testing" - //"fmt" - "bytes" - "compress/gzip" + "testing" + //"fmt" + "bytes" + "compress/gzip" ) func TestMessageCreation(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - if msg.magic != 1 { - t.Errorf("magic incorrect") - t.Fail() - } + payload := []byte("testing") + msg := NewMessage(payload) + if msg.magic != 1 { + t.Errorf("magic incorrect") + t.Fail() + } - // generated by kafka-rb: e8 f3 5a 06 - expected := []byte{0xe8, 0xf3, 0x5a, 0x06} - if !bytes.Equal(expected, msg.checksum[:]) { - t.Fail() - } + // generated by kafka-rb: e8 f3 5a 06 + expected := []byte{0xe8, 0xf3, 0x5a, 0x06} + if !bytes.Equal(expected, msg.checksum[:]) { + t.Fail() + } } func TestMagic0MessageEncoding(t *testing.T) { - // generated by kafka-rb: - // test the old message format - expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - length, msgsDecoded := Decode(expected, DefaultCodecsMap) + // generated by kafka-rb: + // test the old message format + expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + length, msgsDecoded := Decode(expected, DefaultCodecsMap) - if length == 0 || msgsDecoded == nil { - t.Fail() - } - msgDecoded := msgsDecoded[0] + if length == 0 || msgsDecoded == nil { + t.Fail() + } + msgDecoded := msgsDecoded[0] - payload := []byte("testing") - if !bytes.Equal(payload, msgDecoded.payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 0 { - t.Fatal("magic incorrect") - } + payload := []byte("testing") + if !bytes.Equal(payload, msgDecoded.payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 0 { + t.Fatal("magic incorrect") + } } func TestMessageEncoding(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) + payload := []byte("testing") + msg := NewMessage(payload) - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } - // verify round trip - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + // verify round trip + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } } func TestCompressedMessageEncodingCompare(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() - msgDefaultBytes := NewCompressedMessage(payload).Encode() - if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) - } + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() + msgDefaultBytes := NewCompressedMessage(payload).Encode() + if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) + } } func TestCompressedMessageEncoding(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() - expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, - 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, - 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, - 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} + expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, + 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, + 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, + 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} - expected := make([]byte, len(expectedHeader)+len(expectedPayload)) - n := copy(expected, expectedHeader) - copy(expected[n:], expectedPayload) + expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} - if msg.compression != 1 { - t.Fatalf("expected compression: 1 but got: %b", msg.compression) - } + expected := make([]byte, len(expectedHeader)+len(expectedPayload)) + n := copy(expected, expectedHeader) + copy(expected[n:], expectedPayload) - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 100) - n, _ = zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() + if msg.compression != 1 { + t.Fatalf("expected compression: 1 but got: %b", msg.compression) + } - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) - } + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 100) + n, _ = zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) + } - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatalf("checksums do not match, expected: % X but was: % X", - chksum, msgDecoded.checksum[:]) - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatalf("checksums do not match, expected: % X but was: % X", + chksum, msgDecoded.checksum[:]) + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } } func TestLongCompressedMessageRoundTrip(t *testing.T) { - payloadBuf := bytes.NewBuffer([]byte{}) - // make the test bigger than buffer allocated in the Decode - for i := 0; i < 15; i++ { - payloadBuf.Write([]byte("testing123 ")) - } + payloadBuf := bytes.NewBuffer([]byte{}) + // make the test bigger than buffer allocated in the Decode + for i := 0; i < 15; i++ { + payloadBuf.Write([]byte("testing123 ")) + } - uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 200) - n, _ := zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() + uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", - uncompressed, uncompressedMsgBytes) - } + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 200) + n, _ := zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", + uncompressed, uncompressedMsgBytes) + } - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { - t.Fatal("bytes not equal") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { + t.Fatal("bytes not equal") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } } func TestMultipleCompressedMessages(t *testing.T) { - msgs := []*Message{NewMessage([]byte("testing")), - NewMessage([]byte("multiple")), - NewMessage([]byte("messages")), - } - msg := NewCompressedMessages(msgs...) - - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - if length == 0 || msgsDecoded == nil { - t.Fatal("msgsDecoded is nil") - } - - // make sure the decompressed messages match what was put in - for index, decodedMsg := range msgsDecoded { - if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { - t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", - msgs[index].payload, decodedMsg.payload) - } - } + msgs := []*Message{NewMessage([]byte("testing")), + NewMessage([]byte("multiple")), + NewMessage([]byte("messages")), + } + msg := NewCompressedMessages(msgs...) + + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + if length == 0 || msgsDecoded == nil { + t.Fatal("msgsDecoded is nil") + } + + // make sure the decompressed messages match what was put in + for index, decodedMsg := range msgsDecoded { + if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { + t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", + msgs[index].payload, decodedMsg.payload) + } + } } func TestRequestHeaderEncoding(t *testing.T) { - broker := newBroker("localhost:9092", "test", 0) - request := broker.EncodeRequestHeader(REQUEST_PRODUCE) + broker := newBroker("localhost:9092", "test", 0) + request := broker.EncodeRequestHeader(REQUEST_PRODUCE) - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00} + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00} - if !bytes.Equal(expected, request.Bytes()) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) - t.Errorf("expected: %X\n but got: %X", expected, request) - t.Fail() - } + if !bytes.Equal(expected, request.Bytes()) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) + t.Errorf("expected: %X\n but got: %X", expected, request) + t.Fail() + } } func TestPublishRequestEncoding(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) + payload := []byte("testing") + msg := NewMessage(payload) - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodePublishRequest(msg) + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodePublishRequest(msg) - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, - /* magic comp ...... chksum .... .. payload .. */ - 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, + /* magic comp ...... chksum .... .. payload .. */ + 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: % X\n but got: % X", expected, request) + t.Fail() + } } func TestConsumeRequestEncoding(t *testing.T) { - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) - // generated by kafka-rb, encode_request_size + encode_request - expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, - 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} + // generated by kafka-rb, encode_request_size + encode_request + expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, + 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: % X\n but got: % X", expected, request) + t.Fail() + } } Index: clients/go/src/kafka.go =================================================================== --- clients/go/src/kafka.go (revision 1298586) +++ clients/go/src/kafka.go (working copy) @@ -23,73 +23,73 @@ package kafka import ( - "log" - "net" - "os" - "fmt" - "encoding/binary" - "io" - "bufio" + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "net" ) const ( - NETWORK = "tcp" + NETWORK = "tcp" ) type Broker struct { - topic string - partition int - hostname string + topic string + partition int + hostname string } func newBroker(hostname string, topic string, partition int) *Broker { - return &Broker{topic: topic, - partition: partition, - hostname: hostname} + return &Broker{topic: topic, + partition: partition, + hostname: hostname} } -func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { - raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) - if err != nil { - log.Println("Fatal Error: ", err) - return nil, err - } - conn, err = net.DialTCP(NETWORK, nil, raddr) - if err != nil { - log.Println("Fatal Error: ", err) - return nil, err - } - return conn, error +func (b *Broker) connect() (conn *net.TCPConn, error error) { + raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) + if err != nil { + log.Println("Fatal Error: ", err) + return nil, err + } + conn, err = net.DialTCP(NETWORK, nil, raddr) + if err != nil { + log.Println("Fatal Error: ", err) + return nil, err + } + return conn, error } // returns length of response & payload & err -func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { - reader := bufio.NewReader(conn) - length := make([]byte, 4) - lenRead, err := io.ReadFull(reader, length) - if err != nil { - return 0, []byte{}, err - } - if lenRead != 4 || lenRead < 0 { - return 0, []byte{}, os.NewError("invalid length of the packet length field") - } +func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { + reader := bufio.NewReader(conn) + length := make([]byte, 4) + lenRead, err := io.ReadFull(reader, length) + if err != nil { + return 0, []byte{}, err + } + if lenRead != 4 || lenRead < 0 { + return 0, []byte{}, errors.New("invalid length of the packet length field") + } - expectedLength := binary.BigEndian.Uint32(length) - messages := make([]byte, expectedLength) - lenRead, err = io.ReadFull(reader, messages) - if err != nil { - return 0, []byte{}, err - } + expectedLength := binary.BigEndian.Uint32(length) + messages := make([]byte, expectedLength) + lenRead, err = io.ReadFull(reader, messages) + if err != nil { + return 0, []byte{}, err + } - if uint32(lenRead) != expectedLength { - return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) - } + if uint32(lenRead) != expectedLength { + return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) + } - errorCode := binary.BigEndian.Uint16(messages[0:2]) - if errorCode != 0 { - log.Println("errorCode: ", errorCode) - return 0, []byte{}, os.NewError( - fmt.Sprintf("Broker Response Error: %d", errorCode)) - } - return expectedLength, messages[2:], nil + errorCode := binary.BigEndian.Uint16(messages[0:2]) + if errorCode != 0 { + log.Println("errorCode: ", errorCode) + return 0, []byte{}, errors.New( + fmt.Sprintf("Broker Response Error: %d", errorCode)) + } + return expectedLength, messages[2:], nil } Index: clients/go/src/publisher.go =================================================================== --- clients/go/src/publisher.go (revision 1298586) +++ clients/go/src/publisher.go (working copy) @@ -22,34 +22,30 @@ package kafka -import ( - "os" -) - type BrokerPublisher struct { - broker *Broker + broker *Broker } func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher { - return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} + return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} } -func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) { - return b.BatchPublish(message) +func (b *BrokerPublisher) Publish(message *Message) (int, error) { + return b.BatchPublish(message) } -func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) { - conn, err := b.broker.connect() - if err != nil { - return -1, err - } - defer conn.Close() - // TODO: MULTIPRODUCE - request := b.broker.EncodePublishRequest(messages...) - num, err := conn.Write(request) - if err != nil { - return -1, err - } +func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { + conn, err := b.broker.connect() + if err != nil { + return -1, err + } + defer conn.Close() + // TODO: MULTIPRODUCE + request := b.broker.EncodePublishRequest(messages...) + num, err := conn.Write(request) + if err != nil { + return -1, err + } - return num, err + return num, err } Index: clients/go/src/converts.go =================================================================== --- clients/go/src/converts.go (revision 1298586) +++ clients/go/src/converts.go (working copy) @@ -23,29 +23,29 @@ package kafka import ( - "encoding/binary" + "encoding/binary" ) func uint16bytes(value int) []byte { - result := make([]byte, 2) - binary.BigEndian.PutUint16(result, uint16(value)) - return result + result := make([]byte, 2) + binary.BigEndian.PutUint16(result, uint16(value)) + return result } func uint32bytes(value int) []byte { - result := make([]byte, 4) - binary.BigEndian.PutUint32(result, uint32(value)) - return result + result := make([]byte, 4) + binary.BigEndian.PutUint32(result, uint32(value)) + return result } func uint32toUint32bytes(value uint32) []byte { - result := make([]byte, 4) - binary.BigEndian.PutUint32(result, value) - return result + result := make([]byte, 4) + binary.BigEndian.PutUint32(result, value) + return result } func uint64ToUint64bytes(value uint64) []byte { - result := make([]byte, 8) - binary.BigEndian.PutUint64(result, value) - return result + result := make([]byte, 8) + binary.BigEndian.PutUint64(result, value) + return result } Index: clients/go/src/message.go =================================================================== --- clients/go/src/message.go (revision 1298586) +++ clients/go/src/message.go (working copy) @@ -23,160 +23,160 @@ package kafka import ( - "hash/crc32" - "encoding/binary" - "bytes" - "log" + "bytes" + "encoding/binary" + "hash/crc32" + "log" ) const ( - // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression - MAGIC_DEFAULT = 1 - // magic + compression + chksum - NO_LEN_HEADER_SIZE = 1 + 1 + 4 + // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression + MAGIC_DEFAULT = 1 + // magic + compression + chksum + NO_LEN_HEADER_SIZE = 1 + 1 + 4 ) type Message struct { - magic byte - compression byte - checksum [4]byte - payload []byte - offset uint64 // only used after decoding - totalLength uint32 // total length of the raw message (from decoding) + magic byte + compression byte + checksum [4]byte + payload []byte + offset uint64 // only used after decoding + totalLength uint32 // total length of the raw message (from decoding) } func (m *Message) Offset() uint64 { - return m.offset + return m.offset } func (m *Message) Payload() []byte { - return m.payload + return m.payload } func (m *Message) PayloadString() string { - return string(m.payload) + return string(m.payload) } func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message { - message := &Message{} - message.magic = byte(MAGIC_DEFAULT) - message.compression = codec.Id() - message.payload = codec.Encode(payload) - binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload)) - return message + message := &Message{} + message.magic = byte(MAGIC_DEFAULT) + message.compression = codec.Id() + message.payload = codec.Encode(payload) + binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload)) + return message } // Default is is create a message with no compression func NewMessage(payload []byte) *Message { - return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) + return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) } // Create a Message using the default compression method (gzip) func NewCompressedMessage(payload []byte) *Message { - return NewCompressedMessages(NewMessage(payload)) + return NewCompressedMessages(NewMessage(payload)) } func NewCompressedMessages(messages ...*Message) *Message { - buf := bytes.NewBuffer([]byte{}) - for _, message := range messages { - buf.Write(message.Encode()) - } - return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID]) + buf := bytes.NewBuffer([]byte{}) + for _, message := range messages { + buf.Write(message.Encode()) + } + return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID]) } // MESSAGE SET: func (m *Message) Encode() []byte { - msgLen := NO_LEN_HEADER_SIZE + len(m.payload) - msg := make([]byte, 4+msgLen) - binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) - msg[4] = m.magic - msg[5] = m.compression + msgLen := NO_LEN_HEADER_SIZE + len(m.payload) + msg := make([]byte, 4+msgLen) + binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) + msg[4] = m.magic + msg[5] = m.compression - copy(msg[6:], m.checksum[0:]) - copy(msg[10:], m.payload) + copy(msg[6:], m.checksum[0:]) + copy(msg[10:], m.payload) - return msg + return msg } func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { - return Decode(packet, DefaultCodecsMap) + return Decode(packet, DefaultCodecsMap) } func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { - messages := []Message{} + messages := []Message{} - length, message := decodeMessage(packet, payloadCodecsMap) + length, message := decodeMessage(packet, payloadCodecsMap) - if length > 0 && message != nil { - if message.compression != NO_COMPRESSION_ID { - // wonky special case for compressed messages having embedded messages - payloadLen := uint32(len(message.payload)) - messageLenLeft := payloadLen - for messageLenLeft > 0 { - start := payloadLen - messageLenLeft - innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) - messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 - messages = append(messages, *innerMsg) - } - } else { - messages = append(messages, *message) - } - } + if length > 0 && message != nil { + if message.compression != NO_COMPRESSION_ID { + // wonky special case for compressed messages having embedded messages + payloadLen := uint32(len(message.payload)) + messageLenLeft := payloadLen + for messageLenLeft > 0 { + start := payloadLen - messageLenLeft + innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) + messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 + messages = append(messages, *innerMsg) + } + } else { + messages = append(messages, *message) + } + } - return length, messages + return length, messages } func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { - length := binary.BigEndian.Uint32(packet[0:]) - if length > uint32(len(packet[4:])) { - log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) - return 0, nil - } - msg := Message{} - msg.totalLength = length - msg.magic = packet[4] + length := binary.BigEndian.Uint32(packet[0:]) + if length > uint32(len(packet[4:])) { + log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) + return 0, nil + } + msg := Message{} + msg.totalLength = length + msg.magic = packet[4] - rawPayload := []byte{} - if msg.magic == 0 { - msg.compression = byte(0) - copy(msg.checksum[:], packet[5:9]) - payloadLength := length - 1 - 4 - rawPayload = packet[9 : 9+payloadLength] - } else if msg.magic == MAGIC_DEFAULT { - msg.compression = packet[5] - copy(msg.checksum[:], packet[6:10]) - payloadLength := length - NO_LEN_HEADER_SIZE - rawPayload = packet[10 : 10+payloadLength] - } else { - log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) - return 0, nil - } + rawPayload := []byte{} + if msg.magic == 0 { + msg.compression = byte(0) + copy(msg.checksum[:], packet[5:9]) + payloadLength := length - 1 - 4 + rawPayload = packet[9 : 9+payloadLength] + } else if msg.magic == MAGIC_DEFAULT { + msg.compression = packet[5] + copy(msg.checksum[:], packet[6:10]) + payloadLength := length - NO_LEN_HEADER_SIZE + rawPayload = packet[10 : 10+payloadLength] + } else { + log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) + return 0, nil + } - payloadChecksum := make([]byte, 4) - binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload)) - if !bytes.Equal(payloadChecksum, msg.checksum[:]) { - msg.Print() - log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) - return 0, nil - } - msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) + payloadChecksum := make([]byte, 4) + binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload)) + if !bytes.Equal(payloadChecksum, msg.checksum[:]) { + msg.Print() + log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) + return 0, nil + } + msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) - return length, &msg + return length, &msg } func (msg *Message) Print() { - log.Println("----- Begin Message ------") - log.Printf("magic: %X\n", msg.magic) - log.Printf("compression: %X\n", msg.compression) - log.Printf("checksum: %X\n", msg.checksum) - if len(msg.payload) < 1048576 { // 1 MB - log.Printf("payload: % X\n", msg.payload) - log.Printf("payload(string): %s\n", msg.PayloadString()) - } else { - log.Printf("long payload, length: %d\n", len(msg.payload)) - } - log.Printf("length: %d\n", msg.totalLength) - log.Printf("offset: %d\n", msg.offset) - log.Println("----- End Message ------") + log.Println("----- Begin Message ------") + log.Printf("magic: %X\n", msg.magic) + log.Printf("compression: %X\n", msg.compression) + log.Printf("checksum: %X\n", msg.checksum) + if len(msg.payload) < 1048576 { // 1 MB + log.Printf("payload: % X\n", msg.payload) + log.Printf("payload(string): %s\n", msg.PayloadString()) + } else { + log.Printf("long payload, length: %d\n", len(msg.payload)) + } + log.Printf("length: %d\n", msg.totalLength) + log.Printf("offset: %d\n", msg.offset) + log.Println("----- End Message ------") } Index: clients/go/src/payload_codec.go =================================================================== --- clients/go/src/payload_codec.go (revision 1298586) +++ clients/go/src/payload_codec.go (working copy) @@ -23,94 +23,92 @@ package kafka import ( - "bytes" - "compress/gzip" - // "log" + "bytes" + "compress/gzip" + // "log" ) const ( - NO_COMPRESSION_ID = 0 - GZIP_COMPRESSION_ID = 1 + NO_COMPRESSION_ID = 0 + GZIP_COMPRESSION_ID = 1 ) type PayloadCodec interface { - // the 1 byte id of the codec - Id() byte + // the 1 byte id of the codec + Id() byte - // encoder interface for compression implementation - Encode(data []byte) []byte + // encoder interface for compression implementation + Encode(data []byte) []byte - // decoder interface for decompression implementation - Decode(data []byte) []byte + // decoder interface for decompression implementation + Decode(data []byte) []byte } // Default Codecs var DefaultCodecs = []PayloadCodec{ - new(NoCompressionPayloadCodec), - new(GzipPayloadCodec), + new(NoCompressionPayloadCodec), + new(GzipPayloadCodec), } var DefaultCodecsMap = codecsMap(DefaultCodecs) func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { - payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) - for _, c := range payloadCodecs { - payloadCodecsMap[c.Id()] = c, true - } - return payloadCodecsMap + payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) + for _, c := range payloadCodecs { + payloadCodecsMap[c.Id()] = c + } + return payloadCodecsMap } // No compression codec, noop type NoCompressionPayloadCodec struct { - } func (codec *NoCompressionPayloadCodec) Id() byte { - return NO_COMPRESSION_ID + return NO_COMPRESSION_ID } func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte { - return data + return data } func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte { - return data + return data } // Gzip Codec type GzipPayloadCodec struct { - } func (codec *GzipPayloadCodec) Id() byte { - return GZIP_COMPRESSION_ID + return GZIP_COMPRESSION_ID } func (codec *GzipPayloadCodec) Encode(data []byte) []byte { - buf := bytes.NewBuffer([]byte{}) - zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed) - zipper.Write(data) - zipper.Close() - return buf.Bytes() + buf := bytes.NewBuffer([]byte{}) + zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed) + zipper.Write(data) + zipper.Close() + return buf.Bytes() } func (codec *GzipPayloadCodec) Decode(data []byte) []byte { - buf := bytes.NewBuffer([]byte{}) - zipper, _ := gzip.NewReader(bytes.NewBuffer(data)) - unzipped := make([]byte, 100) - for { - n, err := zipper.Read(unzipped) - if n > 0 && err == nil { - buf.Write(unzipped[0:n]) - } else { - break - } - } + buf := bytes.NewBuffer([]byte{}) + zipper, _ := gzip.NewReader(bytes.NewBuffer(data)) + unzipped := make([]byte, 100) + for { + n, err := zipper.Read(unzipped) + if n > 0 && err == nil { + buf.Write(unzipped[0:n]) + } else { + break + } + } - zipper.Close() - return buf.Bytes() + zipper.Close() + return buf.Bytes() } Index: clients/go/src/timing.go =================================================================== --- clients/go/src/timing.go (revision 1298586) +++ clients/go/src/timing.go (working copy) @@ -23,27 +23,27 @@ package kafka import ( - "log" - "time" + "log" + "time" ) type Timing struct { - label string - start int64 - stop int64 + label string + start int64 + stop int64 } func StartTiming(label string) *Timing { - return &Timing{label: label, start: time.Nanoseconds(), stop: 0} + return &Timing{label: label, start: time.Now().UnixNano()} } func (t *Timing) Stop() { - t.stop = time.Nanoseconds() + t.stop = time.Now().UnixNano() } func (t *Timing) Print() { - if t.stop == 0 { - t.Stop() - } - log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000) + if t.stop == 0 { + t.Stop() + } + log.Printf("%s took: %f ms\n", t.label, float64((t.stop-t.start)/1000000)) } Index: clients/go/src/request.go =================================================================== --- clients/go/src/request.go (revision 1298586) +++ clients/go/src/request.go (working copy) @@ -23,79 +23,79 @@ package kafka import ( - "encoding/binary" - "bytes" + "bytes" + "encoding/binary" ) type RequestType uint16 // Request Types const ( - REQUEST_PRODUCE RequestType = 0 - REQUEST_FETCH = 1 - REQUEST_MULTIFETCH = 2 - REQUEST_MULTIPRODUCE = 3 - REQUEST_OFFSETS = 4 + REQUEST_PRODUCE RequestType = 0 + REQUEST_FETCH = 1 + REQUEST_MULTIFETCH = 2 + REQUEST_MULTIPRODUCE = 3 + REQUEST_OFFSETS = 4 ) // Request Header: func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer { - request := bytes.NewBuffer([]byte{}) - request.Write(uint32bytes(0)) // placeholder for request size - request.Write(uint16bytes(int(requestType))) - request.Write(uint16bytes(len(b.topic))) - request.WriteString(b.topic) - request.Write(uint32bytes(b.partition)) + request := bytes.NewBuffer([]byte{}) + request.Write(uint32bytes(0)) // placeholder for request size + request.Write(uint16bytes(int(requestType))) + request.Write(uint16bytes(len(b.topic))) + request.WriteString(b.topic) + request.Write(uint32bytes(b.partition)) - return request + return request } // after writing to the buffer is complete, encode the size of the request in the request. func encodeRequestSize(request *bytes.Buffer) { - binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4)) + binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4)) } // func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte { - request := b.EncodeRequestHeader(REQUEST_OFFSETS) - // specific to offset request - request.Write(uint64ToUint64bytes(uint64(time))) - request.Write(uint32toUint32bytes(maxNumOffsets)) + request := b.EncodeRequestHeader(REQUEST_OFFSETS) + // specific to offset request + request.Write(uint64ToUint64bytes(uint64(time))) + request.Write(uint32toUint32bytes(maxNumOffsets)) - encodeRequestSize(request) + encodeRequestSize(request) - return request.Bytes() + return request.Bytes() } // func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { - request := b.EncodeRequestHeader(REQUEST_FETCH) - // specific to consume request - request.Write(uint64ToUint64bytes(offset)) - request.Write(uint32toUint32bytes(maxSize)) + request := b.EncodeRequestHeader(REQUEST_FETCH) + // specific to consume request + request.Write(uint64ToUint64bytes(offset)) + request.Write(uint32toUint32bytes(maxSize)) - encodeRequestSize(request) + encodeRequestSize(request) - return request.Bytes() + return request.Bytes() } // func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { - // 4 + 2 + 2 + topicLength + 4 + 4 - request := b.EncodeRequestHeader(REQUEST_PRODUCE) + // 4 + 2 + 2 + topicLength + 4 + 4 + request := b.EncodeRequestHeader(REQUEST_PRODUCE) - messageSetSizePos := request.Len() - request.Write(uint32bytes(0)) // placeholder message len + messageSetSizePos := request.Len() + request.Write(uint32bytes(0)) // placeholder message len - written := 0 - for _, message := range messages { - wrote, _ := request.Write(message.Encode()) - written += wrote - } + written := 0 + for _, message := range messages { + wrote, _ := request.Write(message.Encode()) + written += wrote + } - // now add the accumulated size of that the message set was - binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) - // now add the size of the whole to the first uint32 - encodeRequestSize(request) - return request.Bytes() + // now add the accumulated size of that the message set was + binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) + // now add the size of the whole to the first uint32 + encodeRequestSize(request) + return request.Bytes() } Index: clients/go/README.md =================================================================== --- clients/go/README.md (revision 1298531) +++ clients/go/README.md (working copy) @@ -9,13 +9,13 @@ Install go:
For more info see: http://golang.org/doc/install.html#install -Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). Install kafka.go package:
-make install +go get http://svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src
Make the tools (publisher & consumer)
-make tools +go get svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/tools/consumer +go get svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/tools/publisher
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html @@ -87,7 +87,16 @@ offsets, err := broker.GetOffsets(-1, 1) +### Developing the Client #### +Checkout the code Within src directory of a $GOPATH root: + mkdir -p svn.apache.org/repos/asf/incubator/kafka.svn + cd svn.apache.org/repos/asf/incubator/kafka.svn + svn co http://svn.apache.org/repos/asf/incubator/kafka/trunk trunk + cd trunk/clients/go/src + go install + + ### Contact ### jeffreydamick (at) gmail (dot) com