Index: clients/go/src/consumer.go =================================================================== --- clients/go/src/consumer.go (revision 1298531) +++ clients/go/src/consumer.go (working copy) @@ -24,7 +24,6 @@ package kafka import ( "encoding/binary" - "errors" "io" "log" "net" @@ -127,6 +126,7 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, er func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) if err != nil { + log.Println("Fatal Error: ", err) return -1, err } @@ -141,19 +141,25 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M // parse out the messages var currentOffset uint64 = 0 for currentOffset <= uint64(length-4) { - totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) - if msgs == nil { - return num, errors.New("Error Decoding Message") + + payloadConsumed, msgs := Decode(payload[currentOffset:], consumer.codecs) + if msgs == nil || len(msgs) == 0 { + // this isn't invalid as large messages might contain partial messages + consumer.offset += currentOffset + return num, err } msgOffset := consumer.offset + currentOffset + for _, msg := range msgs { // update all of the messages offset // multiple messages can be at the same offset (compressed for example) + msg.offset = msgOffset + msgOffset += 4 + uint64(msg.totalLength) handlerFunc(&msg) num += 1 } - currentOffset += uint64(4 + totalLength) + currentOffset += uint64(payloadConsumed) } // update the broker's offset for next consumption consumer.offset += currentOffset diff --git a/clients/go/src/kafka.go b/clients/go/src/kafka.go index 2692293..bc9472c 100644 --- a/clients/go/src/kafka.go +++ b/clients/go/src/kafka.go @@ -48,7 +48,7 @@ func newBroker(hostname string, topic string, partition int) *Broker { hostname: hostname} } -func (b *Broker) connect() (conn *net.TCPConn, error error) { +func (b *Broker) connect() (conn *net.TCPConn, er error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { log.Println("Fatal Error: ", err) @@ -59,7 +59,7 @@ func (b *Broker) connect() (conn *net.TCPConn, error error) { log.Println("Fatal Error: ", err) return nil, err } - return conn, error + return conn, er } // returns length of response & payload & err @@ -68,6 +68,7 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { length := make([]byte, 4) lenRead, err := io.ReadFull(reader, length) if err != nil { + log.Println("invalid socket read ", err) return 0, []byte{}, err } if lenRead != 4 || lenRead < 0 { diff --git a/clients/go/src/message.go b/clients/go/src/message.go index 4e76059..6efa1ba 100644 --- a/clients/go/src/message.go +++ b/clients/go/src/message.go @@ -50,6 +50,12 @@ func (m *Message) Offset() uint64 { return m.offset } +// the length of payload, overhead (header) + 4 bytes len +// this value + offset should be start of next message +func (m *Message) TotalLen() uint64 { + return uint64(m.totalLength) + 4 +} + func (m *Message) Payload() []byte { return m.payload } @@ -104,34 +110,59 @@ func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { } func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { + messages := []Message{} + packetLen := uint32(len(packet)) + + if packet == nil || packetLen < 9 { + //log.Printf("malformed message? %d \n", len(packet)) + return 0, nil + } + // the package can contain n number of messages + var msgStart uint32 = 0 - length, message := decodeMessage(packet, payloadCodecsMap) - - if length > 0 && message != nil { - if message.compression != NO_COMPRESSION_ID { - // wonky special case for compressed messages having embedded messages - payloadLen := uint32(len(message.payload)) - messageLenLeft := payloadLen - for messageLenLeft > 0 { - start := payloadLen - messageLenLeft - innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) - messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 - messages = append(messages, *innerMsg) + for msgStart <= packetLen { + + if msgStart+4 > packetLen { + return msgStart, messages + } + + length := binary.BigEndian.Uint32(packet[msgStart : msgStart+4]) + + if msgStart+4+length > packetLen { + // messages don't have to have complete messages + // so return the ammount consumed + return msgStart, messages + } + message := decodeMessage(packet[msgStart:msgStart+4+length], length, payloadCodecsMap) + msgStart = length + msgStart + 4 + + if length > 0 && message != nil { + if message.compression != NO_COMPRESSION_ID { + // wonky special case for compressed messages having embedded messages + payloadLen := uint32(len(message.payload)) + messageLenLeft := payloadLen + for messageLenLeft > 0 { + start := payloadLen - messageLenLeft + length = binary.BigEndian.Uint32(message.payload[start:]) + innerMsg := decodeMessage(message.payload[start:], length, payloadCodecsMap) + messageLenLeft = messageLenLeft - length - 4 // message length uint32 + messages = append(messages, *innerMsg) + } + } else { + messages = append(messages, *message) } - } else { - messages = append(messages, *message) } } - return length, messages + return uint32(len(packet)), messages } -func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { - length := binary.BigEndian.Uint32(packet[0:]) - if length > uint32(len(packet[4:])) { - log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) - return 0, nil +func decodeMessage(packet []byte, length uint32, payloadCodecsMap map[byte]PayloadCodec) *Message { + + if length != uint32(len(packet[4:])) { + log.Printf("length mismatch, expected at least : %d, was: %d\n", length, len(packet[4:])) + return nil } msg := Message{} msg.totalLength = length @@ -150,7 +181,7 @@ func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint3 rawPayload = packet[10 : 10+payloadLength] } else { log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) - return 0, nil + return nil } payloadChecksum := make([]byte, 4) @@ -158,11 +189,11 @@ func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint3 if !bytes.Equal(payloadChecksum, msg.checksum[:]) { msg.Print() log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) - return 0, nil + return nil } msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) - return length, &msg + return &msg } func (msg *Message) Print() { @@ -177,6 +208,7 @@ func (msg *Message) Print() { log.Printf("long payload, length: %d\n", len(msg.payload)) } log.Printf("length: %d\n", msg.totalLength) - log.Printf("offset: %d\n", msg.offset) + log.Printf("start offset: %d\n", msg.offset) + log.Printf("end offset: %d\n", msg.offset+msg.TotalLen()) log.Println("----- End Message ------") } diff --git a/clients/go/src/request.go b/clients/go/src/request.go index 0a7df26..8ac25f7 100644 --- a/clients/go/src/request.go +++ b/clients/go/src/request.go @@ -79,7 +79,23 @@ func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { return request.Bytes() } +// +func (b *Broker) EncodeConsumeRequestMultiFetch(offset uint64, maxSize uint32) []byte { + request := b.EncodeRequestHeader(REQUEST_MULTIFETCH) + // specific to consume request + request.Write(uint64ToUint64bytes(offset)) + request.Write(uint32toUint32bytes(maxSize)) + + encodeRequestSize(request) + + return request.Bytes() +} + // +/* +[0 0 0 24 0 1 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 0 0 0 16 0 0] +[0 0 0 24 0 2 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 0 0 0 16 0 0] +*/ func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { // 4 + 2 + 2 + topicLength + 4 + 4 request := b.EncodeRequestHeader(REQUEST_PRODUCE) diff --git a/clients/go/src/timing.go b/clients/go/src/timing.go index abaf4aa..88fa446 100644 --- a/clients/go/src/timing.go +++ b/clients/go/src/timing.go @@ -34,7 +34,7 @@ type Timing struct { } func StartTiming(label string) *Timing { - return &Timing{label: label, start: time.Now().UnixNano()} + return &Timing{label: label, start: time.Now().UnixNano(), stop: 0} } func (t *Timing) Stop() { @@ -45,5 +45,5 @@ func (t *Timing) Print() { if t.stop == 0 { t.Stop() } - log.Printf("%s took: %f ms\n", t.label, float64((t.stop-t.start)/1000000)) + log.Printf("%s took: %f ms\n", t.label, float64((time.Now().UnixNano()-t.start))/1000000) } diff --git a/clients/go/tools/consumer/consumer.go b/clients/go/tools/consumer/consumer.go index a12624e..bb9bb29 100644 --- a/clients/go/tools/consumer/consumer.go +++ b/clients/go/tools/consumer/consumer.go @@ -25,10 +25,10 @@ package main import ( "flag" "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "os" "os/signal" "strconv" - kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "syscall" ) @@ -46,7 +46,7 @@ func init() { flag.StringVar(&topic, "topic", "test", "topic to publish to") flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") - flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from") + flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes to consume a message set") flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") @@ -60,6 +60,7 @@ func main() { broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) var payloadFile *os.File = nil + var msgCt int if len(writePayloadsTo) > 0 { var err error payloadFile, err = os.Create(writePayloadsTo) @@ -70,8 +71,12 @@ func main() { } consumerCallback := func(msg *kafka.Message) { + msgCt++ if printmessage { msg.Print() + } else if msgCt == 1000 { + fmt.Printf("Cur Offset: %d\n", msg.Offset()+msg.TotalLen()) + msgCt = 0 } if payloadFile != nil { payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) @@ -82,6 +87,7 @@ func main() { if consumerForever { quit := make(chan bool, 1) + go func() { var sigIn chan os.Signal signal.Notify(sigIn) diff --git a/clients/go/tools/publisher/publisher.go b/clients/go/tools/publisher/publisher.go index 4000ca4..baafa21 100644 --- a/clients/go/tools/publisher/publisher.go +++ b/clients/go/tools/publisher/publisher.go @@ -23,15 +23,34 @@ package main import ( + "bufio" "flag" "fmt" kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "os" ) +/* + This publisher tool has 4 send modes: + 1. Pass message: + ./publisher -message="good stuff bob" -hostname=192.168.1.15:9092 + + 2. Pass Msg, SendCT: Send the samge message SendCt # of times + ./publisher -sendct=100 -message="good stuff bob" + + 3. MessageFile: pass a message file and it will read + ./publisher -messagefile=/tmp/msgs.log + + 4. Stdin: if message, message file empty it accepts + messages from Console (message end at new line) + ./publisher -topic=atopic -partition=0 + >my message here + +*/ var hostname string var topic string var partition int +var sendCt int var message string var messageFile string var compress bool @@ -41,49 +60,104 @@ func init() { flag.StringVar(&topic, "topic", "test", "topic to publish to") flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.StringVar(&message, "message", "", "message to publish") + flag.IntVar(&sendCt, "sendct", 0, "to do a pseudo load test, set sendct & pass a message ") flag.StringVar(&messageFile, "messagefile", "", "read message from this file") flag.BoolVar(&compress, "compress", false, "compress the messages published") } +// sends +func SendFile(msgFile string) { + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + fmt.Println("Publishing File:", msgFile) + file, err := os.Open(msgFile) + if err != nil { + fmt.Println("Error: ", err) + return + } + stat, err := file.Stat() + if err != nil { + fmt.Println("Error: ", err) + return + } + payload := make([]byte, stat.Size()) + file.Read(payload) + timing := kafka.StartTiming("Sending") + + if compress { + broker.Publish(kafka.NewCompressedMessage(payload)) + } else { + broker.Publish(kafka.NewMessage(payload)) + } + + timing.Print() + file.Close() +} + +func SendMessage() { + + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + + fmt.Println("Publishing :", message) + if compress { + broker.Publish(kafka.NewCompressedMessage([]byte(message))) + } else { + broker.Publish(kafka.NewMessage([]byte(message))) + } +} + +func SendManyMessages() { + + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + timing := kafka.StartTiming("Sending") + + fmt.Println("Publishing :", message, ": Will send ", sendCt, " times") + done := make(chan bool) + msgChan := make(chan *kafka.Message, 1000) + + go broker.PublishOnChannel(msgChan, 100, 100, done) + for i := 0; i < sendCt; i++ { + msgChan <- kafka.NewMessage([]byte(message)) + } + done <- true // force flush + + timing.Print() +} + func main() { flag.Parse() - fmt.Println("Publishing :", message) - fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Printf("Kafka: %s, topic: %s, partition: %d\n", hostname, topic, partition) fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerPublisher(hostname, topic, partition) if len(message) == 0 && len(messageFile) != 0 { - file, err := os.Open(messageFile) - if err != nil { - fmt.Println("Error: ", err) - return - } - stat, err := file.Stat() - if err != nil { - fmt.Println("Error: ", err) - return - } - payload := make([]byte, stat.Size()) - file.Read(payload) - timing := kafka.StartTiming("Sending") - - if compress { - broker.Publish(kafka.NewCompressedMessage(payload)) - } else { - broker.Publish(kafka.NewMessage(payload)) - } - timing.Print() - file.Close() + SendFile(messageFile) + + } else if len(message) > 0 && sendCt == 0 { + + SendMessage() + + } else if len(message) > 0 && sendCt > 0 { + + SendManyMessages() + } else { - timing := kafka.StartTiming("Sending") - if compress { - broker.Publish(kafka.NewCompressedMessage([]byte(message))) - } else { - broker.Publish(kafka.NewMessage([]byte(message))) - } + // console publisher + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + b := bufio.NewReader(os.Stdin) + done := make(chan bool) + msgChan := make(chan *kafka.Message, 1000) + + go broker.PublishOnChannel(msgChan, 2000, 200, done) + fmt.Println("reading from stdin") + for { + if s, e := b.ReadString('\n'); e == nil { + + fmt.Println("sending ---", s) - timing.Print() + msgChan <- kafka.NewMessage([]byte(s)) + + } + } } }