From 507424e7b62268e61dc43e7f7a6940d67e087d48 Mon Sep 17 00:00:00 2001 From: Aaron Raddon Date: Tue, 13 Mar 2012 18:51:09 -0700 Subject: [PATCH] kafka 298 --- clients/go/src/consumer.go | 16 +++++--- clients/go/src/kafka.go | 12 +++-- clients/go/src/message.go | 80 +++++++++++++++++++++++---------- clients/go/src/publisher.go | 5 +- clients/go/tools/consumer/consumer.go | 7 +++- 5 files changed, 82 insertions(+), 38 deletions(-) diff --git a/clients/go/src/consumer.go b/clients/go/src/consumer.go index 4420093..35ff75e 100644 --- a/clients/go/src/consumer.go +++ b/clients/go/src/consumer.go @@ -24,7 +24,6 @@ package kafka import ( "encoding/binary" - "errors" "io" "log" "net" @@ -127,6 +126,7 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, er func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) if err != nil { + log.Println("Fatal Error: ", err) return -1, err } @@ -141,19 +141,25 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M // parse out the messages var currentOffset uint64 = 0 for currentOffset <= uint64(length-4) { - totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) - if msgs == nil { - return num, errors.New("Error Decoding Message") + + payloadConsumed, msgs := Decode(payload[currentOffset:], consumer.codecs) + if msgs == nil || len(msgs) == 0 { + // this isn't invalid as large messages might contain partial messages + consumer.offset += currentOffset + return num, err } msgOffset := consumer.offset + currentOffset + for _, msg := range msgs { // update all of the messages offset // multiple messages can be at the same offset (compressed for example) + msg.offset = msgOffset + msgOffset += 4 + uint64(msg.totalLength) handlerFunc(&msg) num += 1 } - currentOffset += uint64(4 + totalLength) + currentOffset += uint64(payloadConsumed) } // update the broker's offset for next consumption consumer.offset += currentOffset diff --git a/clients/go/src/kafka.go b/clients/go/src/kafka.go index 96a1929..1ead8c3 100644 --- a/clients/go/src/kafka.go +++ b/clients/go/src/kafka.go @@ -37,9 +37,10 @@ const ( ) type Broker struct { - topic string - partition int - hostname string + topic string + partition int + partitions []int + hostname string } func newBroker(hostname string, topic string, partition int) *Broker { @@ -48,7 +49,7 @@ func newBroker(hostname string, topic string, partition int) *Broker { hostname: hostname} } -func (b *Broker) connect() (conn *net.TCPConn, error error) { +func (b *Broker) connect() (conn *net.TCPConn, er error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { log.Println("Fatal Error: ", err) @@ -59,7 +60,7 @@ func (b *Broker) connect() (conn *net.TCPConn, error error) { log.Println("Fatal Error: ", err) return nil, err } - return conn, error + return conn, er } // returns length of response & payload & err @@ -68,6 +69,7 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { length := make([]byte, 4) lenRead, err := io.ReadFull(reader, length) if err != nil { + log.Println("invalid socket read ", err) return 0, []byte{}, err } if lenRead != 4 || lenRead < 0 { diff --git a/clients/go/src/message.go b/clients/go/src/message.go index b214b08..30eea7b 100644 --- a/clients/go/src/message.go +++ b/clients/go/src/message.go @@ -50,6 +50,12 @@ func (m *Message) Offset() uint64 { return m.offset } +// the length of payload, overhead (header) + 4 bytes len +// this value + offset should be start of next message +func (m *Message) TotalLen() uint64 { + return uint64(m.totalLength) + 4 +} + func (m *Message) Payload() []byte { return m.payload } @@ -104,34 +110,59 @@ func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { } func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { + messages := []Message{} + packetLen := uint32(len(packet)) + + if packet == nil || packetLen < 9 { + //log.Printf("malformed message? %d \n", len(packet)) + return 0, nil + } + // the package can contain n number of messages + var msgStart uint32 = 0 - length, message := decodeMessage(packet, payloadCodecsMap) - - if length > 0 && message != nil { - if message.compression != NO_COMPRESSION_ID { - // wonky special case for compressed messages having embedded messages - payloadLen := uint32(len(message.payload)) - messageLenLeft := payloadLen - for messageLenLeft > 0 { - start := payloadLen - messageLenLeft - innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) - messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 - messages = append(messages, *innerMsg) + for msgStart <= packetLen { + + if msgStart+4 > packetLen { + return msgStart, messages + } + + length := binary.BigEndian.Uint32(packet[msgStart : msgStart+4]) + + if msgStart+4+length > packetLen { + // messages don't have to have complete messages + // so return the ammount consumed + return msgStart, messages + } + message := decodeMessage(packet[msgStart:msgStart+4+length], length, payloadCodecsMap) + msgStart = length + msgStart + 4 + + if length > 0 && message != nil { + if message.compression != NO_COMPRESSION_ID { + // wonky special case for compressed messages having embedded messages + payloadLen := uint32(len(message.payload)) + messageLenLeft := payloadLen + for messageLenLeft > 0 { + start := payloadLen - messageLenLeft + length = binary.BigEndian.Uint32(message.payload[start:]) + innerMsg := decodeMessage(message.payload[start:start+length+4], length, payloadCodecsMap) + messageLenLeft = messageLenLeft - length - 4 // message length uint32 + messages = append(messages, *innerMsg) + } + } else { + messages = append(messages, *message) } - } else { - messages = append(messages, *message) } } - return length, messages + return uint32(len(packet)), messages } -func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { - length := binary.BigEndian.Uint32(packet[0:]) - if length > uint32(len(packet[4:])) { - log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) - return 0, nil +func decodeMessage(packet []byte, length uint32, payloadCodecsMap map[byte]PayloadCodec) *Message { + + if length != uint32(len(packet[4:])) { + log.Printf("length mismatch, expected at least : %d, was: %d\n", length, len(packet[4:])) + return nil } msg := Message{} msg.totalLength = length @@ -150,7 +181,7 @@ func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint3 rawPayload = packet[10 : 10+payloadLength] } else { log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) - return 0, nil + return nil } payloadChecksum := make([]byte, 4) @@ -158,11 +189,11 @@ func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint3 if !bytes.Equal(payloadChecksum, msg.checksum[:]) { msg.Print() log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) - return 0, nil + return nil } msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) - return length, &msg + return &msg } func (msg *Message) Print() { @@ -177,6 +208,7 @@ func (msg *Message) Print() { log.Printf("long payload, length: %d\n", len(msg.payload)) } log.Printf("length: %d\n", msg.totalLength) - log.Printf("offset: %d\n", msg.offset) + log.Printf("start offset: %d\n", msg.offset) + log.Printf("end offset: %d\n", msg.offset+msg.TotalLen()) log.Println("----- End Message ------") } diff --git a/clients/go/src/publisher.go b/clients/go/src/publisher.go index fab6c43..44aab07 100644 --- a/clients/go/src/publisher.go +++ b/clients/go/src/publisher.go @@ -103,7 +103,7 @@ func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (Me doSend := func() { msgMu.Lock() var msgBufCopy []*Message - msgBufCopy = msgBuffer + msgBufCopy = msgBuffer msgBuffer = make([]*Message, 0) msgMu.Unlock() request := broker.EncodePublishRequest(msgBufCopy...) @@ -120,13 +120,12 @@ func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (Me for _ = range timer.C { msgMu.Lock() if len(msgBuffer) > 0 && !hasSent { - hasSent = false msgMu.Unlock() doSend() } else { msgMu.Unlock() } - + hasSent = false } }() diff --git a/clients/go/tools/consumer/consumer.go b/clients/go/tools/consumer/consumer.go index be66f9c..4cf134c 100644 --- a/clients/go/tools/consumer/consumer.go +++ b/clients/go/tools/consumer/consumer.go @@ -46,7 +46,7 @@ func init() { flag.StringVar(&topic, "topic", "test", "topic to publish to") flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") - flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request") + flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes to consume a message set") flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") @@ -60,6 +60,7 @@ func main() { broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) var payloadFile *os.File = nil + var msgCt int if len(writePayloadsTo) > 0 { var err error payloadFile, err = os.Create(writePayloadsTo) @@ -70,8 +71,12 @@ func main() { } consumerCallback := func(msg *kafka.Message) { + msgCt++ if printmessage { msg.Print() + } else if msgCt == 1000 { + fmt.Printf("Cur Offset: %d\n", msg.Offset()+msg.TotalLen()) + msgCt = 0 } if payloadFile != nil { payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) -- 1.7.4.4