From 5c2921ff2dd7e054a1812e810abd3776d9419560 Mon Sep 17 00:00:00 2001 From: Aaron Raddon Date: Tue, 13 Mar 2012 18:54:46 -0700 Subject: [PATCH] multiproduce and multi-fetch --- clients/go/src/bytebuffer.go | 196 +++++++++++++++++++++++++++++++ clients/go/src/consumer.go | 166 +++++++++++++++++++++----- clients/go/src/converts.go | 14 ++- clients/go/src/kafka.go | 125 ++++++++++++++------ clients/go/src/kafka_test.go | 166 ++++++++++++++++++++++++-- clients/go/src/message.go | 25 +++- clients/go/src/publisher.go | 86 ++++++++++---- clients/go/src/request.go | 182 +++++++++++++++++++++++++---- clients/go/tools/consumer/consumer.go | 47 ++++++-- clients/go/tools/publisher/publisher.go | 104 +++++++++++++---- 10 files changed, 942 insertions(+), 169 deletions(-) create mode 100644 clients/go/src/bytebuffer.go diff --git a/clients/go/src/bytebuffer.go b/clients/go/src/bytebuffer.go new file mode 100644 index 0000000..5015813 --- /dev/null +++ b/clients/go/src/bytebuffer.go @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "log" +) +/* +Format of a Multi-Fetch RESPONSE + +[0 0 9 168 0 0 0 0 6 119 0 0 0 0 0 41 1 1 106 31 14 40 31 139 8 0 0 0 0 0] + +0 0 9 168 +0 0 + -- repeat + 0 0 6 119 + 0 0 + - repeat + 0 0 0 41 + 1 + 1 + 106 31 14 40 + 31 139 8 0 0 .... +*/ + + +// a Byte buffer is a wrapper over bufio.Reader that allows us to forward only read +// while understanding the kafka format for bytes/lenghts, etc +type ByteBuffer struct { + ct int + reader *bufio.Reader + Size uint32 + //msgs []*Message +} +func NewByteBuffer(ct int, buf *bufio.Reader) *ByteBuffer { + b := ByteBuffer{ct: ct, reader:buf} + return &b +} + +func (b *ByteBuffer) Len() int { + return b.ct +} + +// a pattern is uint32, followed by uint16 +func (b *ByteBuffer) firstRead() (uint32, uint16, error) { + + var err error + + length := make([]byte, 4) + lenRead, err := io.ReadFull(b.reader, length) + if err != nil { + log.Println("invalid socket read ", err) + return 0, 0, err + } + if lenRead != 4 || lenRead < 0 { + return 0, 0, errors.New("invalid length of the packet length field") + } + + expectedLength := binary.BigEndian.Uint32(length) + + shortBytes := make([]byte, 2) + lenRead, err = io.ReadFull(b.reader, shortBytes) + if err != nil { + return 0, 0, err + } + if lenRead != 2 || lenRead < 0 { + return 0, 0, errors.New("invalid length of the short int field") + } + + shortInt := binary.BigEndian.Uint16(shortBytes) + log.Println(length, shortBytes, expectedLength, shortInt) + + return expectedLength, shortInt, nil + +} + +// initial read of a multi-set response, this will read total len +// (first 4 bytes) and # of sets +func (b *ByteBuffer) ReadHeader() error { + + //log.Println(b.reader.Peek(30)) + + size, errorCode, err := b.firstRead() + if err != nil { + log.Println("invalid socket read ", err) + return err + } + + b.Size = size + if errorCode != 0 { + log.Println("errorCode: ", errorCode) + return errors.New(fmt.Sprintf("Broker Response Error: %d", errorCode)) + } + return nil +} + +// Read the length and error for this set (message/offset) +func (b *ByteBuffer) ReadSet() (int, error) { + + //log.Println(b.reader.Peek(30)) + + size, errorCode, err := b.firstRead() + if errorCode != 0 || err != nil { + log.Println("errorCode: ", errorCode) + return 0, errors.New(fmt.Sprintf("Broker Response Error: %d", errorCode)) + } + return int(size), nil + +} + + +func (b *ByteBuffer) NextMsg(payloadCodecsMap map[byte]PayloadCodec) (int, []*Message, error) { + + //log.Println(b.reader.Peek(20)) + length, err := b.reader.Peek(4) + if err != nil { + log.Println("invalid socket read ", err) + return 0,nil, err + } + if len(length) != 4 { + return 0,nil, errors.New("invalid length of the packet length field") + } + + expectedLength := binary.BigEndian.Uint32(length) + payload := make([]byte, expectedLength + 4) + lenRead, err := io.ReadFull(b.reader, payload) + if err != nil { + return 0,nil, err + } + if uint32(lenRead) != expectedLength + 4 { + // this is actually an expected condition, the last message in a message + // set can be a partial if maxsize was exceeded + return 0, nil, nil + } + log.Println(payload, expectedLength) + payloadConsumed, msgs := Decode(payload, payloadCodecsMap) + if msgs == nil || len(msgs) == 0 { + // this isn't invalid as large messages might contain partial messages + return 0, []*Message{}, err + } + + return int(payloadConsumed) , msgs, err + +} + +func (b *ByteBuffer) Payload() ([]byte, error) { + + var err error + + length := make([]byte, 4) + lenRead, err := io.ReadFull(b.reader, length) + if err != nil { + log.Println("invalid socket read ", err) + return []byte{}, err + } + if lenRead != 4 || lenRead < 0 { + return []byte{}, errors.New("invalid length of the packet length field") + } + + expectedLength := binary.BigEndian.Uint32(length) + payload := make([]byte, expectedLength) + lenRead, err = io.ReadFull(b.reader, payload) + if err != nil { + return []byte{}, err + } + if lenRead != int(expectedLength) { + return []byte{}, errors.New("invalid length of the packet length field") + } + return payload, err + +} \ No newline at end of file diff --git a/clients/go/src/consumer.go b/clients/go/src/consumer.go index 35ff75e..cc076c7 100644 --- a/clients/go/src/consumer.go +++ b/clients/go/src/consumer.go @@ -30,11 +30,13 @@ import ( "time" ) +type MessageHandlerFunc func(string, int, *Message) + + type BrokerConsumer struct { broker *Broker - offset uint64 - maxSize uint32 codecs map[byte]PayloadCodec + Handler MessageHandlerFunc } // Create a new broker consumer @@ -44,10 +46,14 @@ type BrokerConsumer struct { // offset to start consuming from // maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published) func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer { - return &BrokerConsumer{broker: newBroker(hostname, topic, partition), - offset: offset, - maxSize: maxSize, - codecs: DefaultCodecsMap} + tp := TopicPartition{Topic:topic,Partition:partition,Offset:offset,MaxSize:maxSize} + return &BrokerConsumer{broker: newBroker(hostname, &tp),codecs: DefaultCodecsMap} +} + +// Multiple Topic/Partition consumer +func NewMultiConsumer(hostname string, tplist []*TopicPartition) *BrokerConsumer { + //[]*TopicPartition{tp} + return &BrokerConsumer{broker: newMultiBroker(hostname, tplist), codecs: DefaultCodecsMap} } // Simplified consumer that defaults the offset and maxSize to 0. @@ -55,12 +61,11 @@ func NewBrokerConsumer(hostname string, topic string, partition int, offset uint // topic to consume // partition to consume from func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer { - return &BrokerConsumer{broker: newBroker(hostname, topic, partition), - offset: 0, - maxSize: 0, - codecs: DefaultCodecsMap} + tp := TopicPartition{Topic:topic,Partition:partition,Offset:0,MaxSize:0} + return &BrokerConsumer{broker: newBroker(hostname, &tp), codecs: DefaultCodecsMap} } + // Add Custom Payload Codecs for Consumer Decoding // payloadCodecs - an array of PayloadCodec implementations func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { @@ -73,6 +78,7 @@ func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) { conn, err := consumer.broker.connect() if err != nil { + quit <- true return -1, err } @@ -80,11 +86,11 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime done := make(chan bool, 1) go func() { for { - _, err := consumer.consumeWithConn(conn, func(msg *Message) { + _, err := consumer.consumeWithConn(conn, func(topic string, partition int, msg *Message) { msgChan <- msg num += 1 }) - + if err != nil { if err != io.EOF { log.Println("Fatal Error: ", err) @@ -93,7 +99,8 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime quit <- true // force quit break } - time.Sleep(time.Duration(pollTimeoutMs) * time.Millisecond * 1000000) + log.Println("consume loop") + time.Sleep(time.Duration(pollTimeoutMs) * time.Millisecond) } done <- true }() @@ -105,8 +112,6 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime return num, err } -type MessageHandlerFunc func(msg *Message) - func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) { conn, err := consumer.broker.connect() if err != nil { @@ -123,51 +128,145 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, er return num, err } -func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { - _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) +func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (num int, err error) { + + var msgs []*Message + var payloadConsumed int + if len(consumer.broker.topics) > 1 { + return consumer.consumeMultiWithConn(conn,handlerFunc) + } + + tp := consumer.broker.topics[0] + request := consumer.broker.EncodeConsumeRequest() + log.Println(request, " \n\t", string(request)) + _, err = conn.Write(request) + if err != nil { log.Println("Fatal Error: ", err) return -1, err } - length, payload, err := consumer.broker.readResponse(conn) + reader := consumer.broker.readResponse(conn) - if err != nil { + err = reader.ReadHeader() + if err != nil || reader == nil { return -1, err } - num := 0 - if length > 2 { + if reader.Size > 2 { // parse out the messages var currentOffset uint64 = 0 - for currentOffset <= uint64(length-4) { + for { + payloadConsumed, msgs, err = reader.NextMsg(consumer.codecs) - payloadConsumed, msgs := Decode(payload[currentOffset:], consumer.codecs) + if err != nil { + log.Println("ERROR< ", err) + } if msgs == nil || len(msgs) == 0 { // this isn't invalid as large messages might contain partial messages - consumer.offset += currentOffset + tp.Offset += currentOffset return num, err } - msgOffset := consumer.offset + currentOffset + msgOffset := tp.Offset + currentOffset for _, msg := range msgs { // update all of the messages offset // multiple messages can be at the same offset (compressed for example) - msg.offset = msgOffset - msgOffset += 4 + uint64(msg.totalLength) - handlerFunc(&msg) + //msgOffset += 4 + uint64(msg.totalLength) + msgOffset += msg.TotalLen() + handlerFunc(tp.Topic, tp.Partition, msg) num += 1 } + currentOffset += uint64(payloadConsumed) } - // update the broker's offset for next consumption - consumer.offset += currentOffset + // update the topic/partition segment offset for next consumption + tp.Offset += currentOffset } return num, err } +func (consumer *BrokerConsumer) consumeMultiWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (num int, err error) { + + _, err = conn.Write(consumer.broker.EncodeConsumeRequestMultiFetch()) + + if err != nil { + log.Println("Fatal Error: ", err) + return -1, err + } + log.Println("about to call read multi response") + reader := consumer.broker.readMultiResponse(conn) + log.Println("after call") + err = reader.ReadHeader() + log.Println("after read header", err) + if err != nil { + log.Println("Fatal Error: ", err) + return -1, err + } + + var tp *TopicPartition + var currentOffset uint64 + var msgs []*Message + var payloadConsumed int + + log.Println("len ", reader.Len()) + for tpi := 0; tpi < reader.Len() ; tpi ++ { + //log.Println("new loop ", tpi) + // do we not know the topic/partition? or assume it stayed ordered? + tp = consumer.broker.topics[tpi] + + length, err := reader.ReadSet() + log.Println("size of this set", length) + if err != nil || reader == nil { + log.Println("ERROR, err in read", err) + return -1, err + } + currentOffset = 0 + msgsetloop: for { + payloadConsumed, msgs, err = reader.NextMsg(consumer.codecs) + log.Println("consumed", payloadConsumed, currentOffset) + if err != nil { + log.Println("ERROR< ", err) + break + } + if msgs == nil || len(msgs) == 0 { + // this isn't invalid as large messages might contain partial messages + tp.Offset += currentOffset + log.Println("no messages? ") + return num, err + } + msgOffset := tp.Offset + currentOffset + + for _, msg := range msgs { + // update all of the messages offset + // multiple messages can be at the same offset (compressed for example) + msg.offset = msgOffset + //msgOffset += 4 + uint64(msg.totalLength) + msgOffset += msg.TotalLen() + handlerFunc(tp.Topic, tp.Partition, msg) + num += 1 + } + + currentOffset += uint64(payloadConsumed) + log.Println(currentOffset, length) + if currentOffset + 2 >= uint64(length) { + break msgsetloop + } + } + // update the topic/partition segment offset for next consumption + if currentOffset > 2 { + //currentOffset +=2 + tp.Offset += currentOffset + } + + log.Println("tp.offset ", tp.Offset, tp.Partition) + } + return num, err +} + + // Get a list of valid offsets (up to maxNumOffsets) before the given time, where // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) // The result is a list of offsets, in descending order. @@ -186,16 +285,17 @@ func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([] return offsets, err } - length, payload, err := consumer.broker.readResponse(conn) + reader := consumer.broker.readResponse(conn) + payload, err := reader.Payload() if err != nil { return offsets, err } - if length > 4 { + if len(payload) > 4 { // get the number of offsets numOffsets := binary.BigEndian.Uint32(payload[0:]) var currentOffset uint64 = 4 - for currentOffset < uint64(length-4) && uint32(len(offsets)) < numOffsets { + for currentOffset < uint64(len(payload)-4) && uint32(len(offsets)) < numOffsets { offset := binary.BigEndian.Uint64(payload[currentOffset:]) offsets = append(offsets, offset) currentOffset += 8 // offset size diff --git a/clients/go/src/converts.go b/clients/go/src/converts.go index cb7fc90..0163914 100644 --- a/clients/go/src/converts.go +++ b/clients/go/src/converts.go @@ -32,9 +32,9 @@ func uint16bytes(value int) []byte { return result } -func uint32bytes(value int) []byte { +func uint32bytes(value uint32) []byte { result := make([]byte, 4) - binary.BigEndian.PutUint32(result, uint32(value)) + binary.BigEndian.PutUint32(result, value) return result } @@ -49,3 +49,13 @@ func uint64ToUint64bytes(value uint64) []byte { binary.BigEndian.PutUint64(result, value) return result } + +func intfrom2bytes(b []byte) int { + return int(binary.BigEndian.Uint16(b)) +} +func intfrom4bytes(b []byte) int { + return int(binary.BigEndian.Uint32(b)) +} +func uint32from4bytes(b []byte) uint32 { + return binary.BigEndian.Uint32(b) +} \ No newline at end of file diff --git a/clients/go/src/kafka.go b/clients/go/src/kafka.go index 1ead8c3..1f43807 100644 --- a/clients/go/src/kafka.go +++ b/clients/go/src/kafka.go @@ -24,29 +24,90 @@ package kafka import ( "bufio" - "encoding/binary" - "errors" - "fmt" - "io" "log" + "math/rand" "net" + "strings" + "strconv" + "time" ) const ( NETWORK = "tcp" ) +type TopicPartition struct { + Offset uint64 + MaxSize uint32 + Topic string + Partition int +} + +// creates a list of Topic Partitions for a single topic +func NewTopicPartitions(topic, partstr string, offset uint64, maxSize uint32) []*TopicPartition{ + parts := strings.Split(partstr,",") + partitions := make([]*TopicPartition,0) + for _, part := range parts { + partition, err := strconv.Atoi(part) + if err == nil { + tp := TopicPartition{Topic:topic,Partition:partition,Offset:offset,MaxSize:maxSize} + partitions = append(partitions, &tp) + } + } + return partitions +} + + type Broker struct { - topic string - partition int - partitions []int + topics []*TopicPartition hostname string + Partitioner Partitioner } -func newBroker(hostname string, topic string, partition int) *Broker { - return &Broker{topic: topic, - partition: partition, - hostname: hostname} +func newBroker(hostname string, tp *TopicPartition) *Broker { + + b := Broker{topics:[]*TopicPartition{tp}, hostname: hostname} + + b.Partitioner = func(b *Broker) int { + return tp.Partition + } + return &b + +} + + +func newMultiBroker(hostname string, tplist []*TopicPartition) *Broker { + + b := Broker{topics:tplist, hostname: hostname} + partitions := make([]int,len(tplist)) + for tpct, tp := range tplist { + partitions[tpct] = tp.Partition + } + + b.Partitioner = MakeRandomPartitioner(partitions) + return &b + +} + +// creates a broker that uses random paritioner, for a single topic but many partitions +func NewRandomPartitionedBroker(hostname string, topic string, partitions []int) *Broker { + tplist := make([]*TopicPartition,0) + for _, partition := range partitions { + tp := TopicPartition{Topic:topic,Partition:partition} + tplist = append(tplist, &tp) + } + b := Broker{hostname: hostname, topics: tplist} + b.Partitioner = MakeRandomPartitioner(partitions) + return &b +} + +// Create a Random Partitioner Func +func MakeRandomPartitioner(partitions []int) Partitioner { + rp := rand.New(rand.NewSource(time.Now().UnixNano())) + partitionSize := len(partitions) + return func(b *Broker) int { + return partitions[rp.Intn(partitionSize)] + } } func (b *Broker) connect() (conn *net.TCPConn, er error) { @@ -63,35 +124,21 @@ func (b *Broker) connect() (conn *net.TCPConn, er error) { return conn, er } -// returns length of response & payload & err -func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { +// returns buffer reader for single requests +func (b *Broker) readResponse(conn *net.TCPConn) (*ByteBuffer) { reader := bufio.NewReader(conn) - length := make([]byte, 4) - lenRead, err := io.ReadFull(reader, length) - if err != nil { - log.Println("invalid socket read ", err) - return 0, []byte{}, err - } - if lenRead != 4 || lenRead < 0 { - return 0, []byte{}, errors.New("invalid length of the packet length field") - } - - expectedLength := binary.BigEndian.Uint32(length) - messages := make([]byte, expectedLength) - lenRead, err = io.ReadFull(reader, messages) - if err != nil { - return 0, []byte{}, err - } + br := NewByteBuffer(1, reader) + return br - if uint32(lenRead) != expectedLength { - return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) - } +} - errorCode := binary.BigEndian.Uint16(messages[0:2]) - if errorCode != 0 { - log.Println("errorCode: ", errorCode) - return 0, []byte{}, errors.New( - fmt.Sprintf("Broker Response Error: %d", errorCode)) - } - return expectedLength, messages[2:], nil +// returns buffer reader for multiple fetch requests (offsets/fetchmsgs) +func (b *Broker) readMultiResponse(conn *net.TCPConn) (*ByteBuffer) { + reader := bufio.NewReader(conn) + br := NewByteBuffer(len(b.topics), reader) + return br } + + + + diff --git a/clients/go/src/kafka_test.go b/clients/go/src/kafka_test.go index 49c54fa..0b4da23 100644 --- a/clients/go/src/kafka_test.go +++ b/clients/go/src/kafka_test.go @@ -29,10 +29,39 @@ import ( "testing" ) +type MessageMatch struct { + PayloadIn string + Magic byte + Compression byte + Checksum [4]byte + Payload []byte + TotalLength uint64 // total length of the raw message (from decoding) +} + +type MessageTestDef struct { + Name string + Compress byte + Topic string + Partition int + MessageMatch +} +func (td *MessageTestDef) TopicPartition() *TopicPartition { + return &TopicPartition{Topic:td.Topic, Partition:td.Partition} +} +var testData map[string]MessageTestDef +var topics []*TopicPartition = make([]*TopicPartition,0) + func init() { log.SetFlags(log.Ltime|log.Lshortfile) + testData = make(map[string]MessageTestDef) + testData["testing"] = MessageTestDef{"testing",0,"test",0,MessageMatch{"testing",1,0,[4]byte{232, 243, 90, 6},[]byte{116,101,115,116,105,110,103},17}} + testData["testingpartition1"] = MessageTestDef{"testing",0,"test",1,MessageMatch{"testing",1,0,[4]byte{232, 243, 90, 6},[]byte{116,101,115,116,105,110,103},17}} + for _, td := range testData { + topics = append(topics, td.TopicPartition()) + } } + func TestMessageCreation(t *testing.T) { payload := []byte("testing") msg := NewMessage(payload) @@ -120,10 +149,10 @@ func TestCompressedMessageEncoding(t *testing.T) { msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - /* NOTE: I could not get these tests to pass from apache trunk, i redid the values, - the tests passed, and i sent the message from go producer -> scala consumer and it worked? - not sure where these values for expected came from - expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, + // NOTE: I could not get these tests to pass from apach trunk, i redid the values, + // the tests passed, and i sent the message from go producer -> scala consumer and it worked? + // not sure where these values for expected came from + /*expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} */ @@ -132,6 +161,10 @@ func TestCompressedMessageEncoding(t *testing.T) { 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} + // here is the difference [ ] + //expected: 1F 8B 08 00 00 00 00 00 04 FF 62 60 60 E0 65 64 78 F1 39 8A AD 24 + // but got: 1F 8B 08 00 00 09 6E 88 04 FF 62 60 60 E0 65 64 78 F1 39 8A AD 24 + //expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x96, 0x71, 0xA6, 0xE8} @@ -237,8 +270,10 @@ func TestMultipleCompressedMessages(t *testing.T) { } func TestRequestHeaderEncoding(t *testing.T) { - broker := newBroker("localhost:9092", "test", 0) - request := broker.EncodeRequestHeader(REQUEST_PRODUCE) + broker := newBroker("localhost:9092", &TopicPartition{Topic:"test", Partition:0}) + request := bytes.NewBuffer([]byte{}) + broker.EncodeRequestHeader(request, REQUEST_PRODUCE) + EncodeTopicHeader(request, broker.topics[0].Topic, 0) // generated by kafka-rb: expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, @@ -256,7 +291,7 @@ func TestPublishRequestEncoding(t *testing.T) { msg := NewMessage(payload) pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodePublishRequest(msg) + request := pubBroker.broker.EncodeProduceRequest(msg) // generated by kafka-rb: expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, @@ -272,9 +307,9 @@ func TestPublishRequestEncoding(t *testing.T) { } func TestConsumeRequestEncoding(t *testing.T) { - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) + tp := TopicPartition{Topic:"test", Partition:0, Offset:0, MaxSize: 1048576} + pubBroker := NewProducer("localhost:9092", []*TopicPartition{&tp}) + request := pubBroker.broker.EncodeConsumeRequest() // generated by kafka-rb, encode_request_size + encode_request expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, @@ -287,3 +322,114 @@ func TestConsumeRequestEncoding(t *testing.T) { t.Fail() } } + +// re-usable routine for verifying messages +func testMessage(t *testing.T, msg *Message, tm *MessageTestDef, request []byte) { + + //TODO, add compression check + + if !bytes.Equal([]byte(tm.PayloadIn), msg.Payload()) { + t.Fatalf("payload in: % X \npayload: % X bytes not equal, %s", + []byte(tm.PayloadIn), msg.Payload(), string(msg.Payload())) + } + + if tm.TotalLength != msg.TotalLen() { + t.Fatalf("msg len expected: %d but got: %d for %s", tm.TotalLength, msg.TotalLen(), tm.Name) + } + + if !bytes.Equal(tm.Checksum[:], msg.checksum[:]) { + t.Fatalf("checksums do not match for %s\n", tm.Name) + } + if msg.magic != tm.Magic { + t.Fatalf("magic incorrect expected %v but was %v", tm.Magic, msg.magic) + } + if msg.compression != tm.Compress || tm.Compression != msg.compression { + t.Fatalf("compression incorrect expected %v and %v but was %v", tm.Compress, tm.Compression, msg.compression) + } +} + +// test multi-produce header/message sets +func testMultiProduceHeaders(t *testing.T, mds []MessageTestDef, pr ProduceRequest, request []byte) { + + msgs_len := 0 + request_header := 4 + 2 + 2 + topic_partition := 0 // 2 + len("test") + 4 + 4 + for _, tm := range mds { + topic_partition += 2 + len(tm.Topic) + 4 + 4 + } + + for _, msgs := range pr { + for _, msg := range msgs { + msgs_len += int(msg.TotalLen()) + } + } + + total_len := request_header + topic_partition + msgs_len + if len(request) != total_len { + t.Fatalf("request len should be %d but was %d", total_len, len(request)) + } +} + +func TestMultiProduceEncoding(t *testing.T) { + tm := testData["testing"] + tm2 := testData["testingpartition1"] + tm.Name = "multiproduce encode testing" + msg := NewMessage([]byte(tm.PayloadIn)) + msg2 := NewMessage([]byte(tm2.PayloadIn)) + _ = msg.Encode() + _ = msg2.Encode() + + producer := NewPartitionedProducer("localhost:9092", tm.Topic, []int{0, 1}) + msgs := make(ProduceRequest) + msgs[0] = []*Message{msg} + msgs[1] = []*Message{msg2} + request := producer.broker.EncodeMultiProduceRequest(&msgs) + + /* + 0 0 0 66 - len of collection of messages + 0 3 - request type (multi-produce) + 0 2 - num of topic-partition combos + - repeat once per topic/partition + 0 4 len of topic (bytes) + 116 101 115 116 topic (test) + 0 0 0 0 partition 0 + 0 0 0 17 - len of message set(could be n # of messages) + msg + - 2nd topic/partition combo + + 0 0 0 66 0 3 0 2 0 4 116 101 115 116 0 0 0 0 0 0 0 17 0 0 0 13 1 0 232 243 90 6 116 101 115 116 105 110 103 + 0 4 116 101 115 116 0 0 0 1 0 0 0 17 0 0 0 13 1 0 232 243 90 6 116 101 115 116 105 110 103 + */ + testMessage(t,msg,&tm,request) + testMessage(t,msg2,&tm2,request) + + // remember go gives is no order of iteration, so we don't know which is first + if !bytes.Equal(request[:14], []byte{0,0,0,66,0,3,0,2,0,4,116,101,115,116}){ + t.Fatalf("payload in: % X \npayload: % X bytes not equal", + request[:14], []byte{0,0,0,66,0,3,0,2,0,4,116,101,115,116}) + } + + testMultiProduceHeaders(t, []MessageTestDef{tm,tm2}, msgs, request) + +} + +func TestMultiProduceTopics(t *testing.T) { + // implement tests for multiple topics, multiple partitions + +} + +func TestMultiFetchEncoding(t *testing.T) { + /* + [0 0 0 48 0 2 0 2 + 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 0 0 0 16 0 0 + 0 4 116 101 115 116 0 0 0 1 0 0 0 0 0 0 0 0 0 16 0 0] + */ + con := NewMultiConsumer("localhost:9092", topics) + request := con.broker.EncodeConsumeRequestMultiFetch() + // remember go gives is no order of iteration, so we don't know which is first + if !bytes.Equal(request[:14], []byte{0,0,0,48,0,2,0,2,0,4,116,101,115,116}){ + t.Fatalf("request: % X \request: % X bytes not equal", + request[:14], []byte{0,0,0,48,0,2,0,2,0,4,116,101,115,116}) + } + +} \ No newline at end of file diff --git a/clients/go/src/message.go b/clients/go/src/message.go index 30eea7b..0980709 100644 --- a/clients/go/src/message.go +++ b/clients/go/src/message.go @@ -43,9 +43,15 @@ type Message struct { payload []byte offset uint64 // only used after decoding totalLength uint32 // total length of the raw message (from decoding) +} +// Used for sending messages +type MessageTopic struct { + Topic string + Message } + func (m *Message) Offset() uint64 { return m.offset } @@ -78,6 +84,13 @@ func NewMessage(payload []byte) *Message { return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) } +// a message for a specific topic +func NewMessageTopic(topic string, payload []byte) *MessageTopic { + m := NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) + + return &MessageTopic{Topic:topic,Message:*m} +} + // Create a Message using the default compression method (gzip) func NewCompressedMessage(payload []byte) *Message { return NewCompressedMessages(NewMessage(payload)) @@ -98,20 +111,20 @@ func (m *Message) Encode() []byte { binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) msg[4] = m.magic msg[5] = m.compression - + m.totalLength = uint32(msgLen) copy(msg[6:], m.checksum[0:]) copy(msg[10:], m.payload) return msg } -func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { +func DecodeWithDefaultCodecs(packet []byte) (uint32, []*Message) { return Decode(packet, DefaultCodecsMap) } -func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { +func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []*Message) { - messages := []Message{} + messages := []*Message{} packetLen := uint32(len(packet)) if packet == nil || packetLen < 9 { @@ -147,10 +160,10 @@ func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Me length = binary.BigEndian.Uint32(message.payload[start:]) innerMsg := decodeMessage(message.payload[start:start+length+4], length, payloadCodecsMap) messageLenLeft = messageLenLeft - length - 4 // message length uint32 - messages = append(messages, *innerMsg) + messages = append(messages, innerMsg) } } else { - messages = append(messages, *message) + messages = append(messages, message) } } } diff --git a/clients/go/src/publisher.go b/clients/go/src/publisher.go index 44aab07..b9d820b 100644 --- a/clients/go/src/publisher.go +++ b/clients/go/src/publisher.go @@ -29,16 +29,36 @@ import ( "time" ) -type MessageSender func(*Message) +type MessageSender func(msg *Message) + +// an interface for a partitioner that chooses from available partitions +type Partitioner func(*Broker) int + +// a produce request with multiple partitions +type ProduceRequest map[int][]*Message + +//type MessageSender func(topic string, partition int, *Message) type BrokerPublisher struct { broker *Broker } - func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher { - return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} + tp := TopicPartition{Topic:topic,Partition:partition} + b := newBroker(hostname, &tp) + return &BrokerPublisher{broker: b} +} +func NewProducer(hostname string, tplist []*TopicPartition) *BrokerPublisher { + b := newMultiBroker(hostname, tplist) + return &BrokerPublisher{broker: b} } +// create a new Producer, that uses multi-produce, and random partitioner +func NewPartitionedProducer(hostname string, topic string, partitions []int) *BrokerPublisher { + b := NewRandomPartitionedBroker(hostname, topic, partitions ) + return &BrokerPublisher{broker: b} +} + + func (b *BrokerPublisher) Publish(message *Message) (int, error) { return b.BatchPublish(message) } @@ -49,8 +69,8 @@ func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { return -1, err } defer conn.Close() - // TODO: MULTIPRODUCE - request := b.broker.EncodePublishRequest(messages...) + + request := b.broker.EncodeProduceRequest(messages...) num, err := conn.Write(request) if err != nil { return -1, err @@ -86,6 +106,7 @@ func (b *BrokerPublisher) PublishOnChannel(msgChan chan *Message, bufferMaxMs in } // Buffered Sender, buffers messages for max time, and max size +// uses a partitioner to choose partition func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (MessageSender, *net.TCPConn, error) { conn, err := broker.connect() @@ -93,24 +114,34 @@ func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (Me return nil, nil, err } - msgBuffer := make([]*Message, 0) - + msgBuffer := make(ProduceRequest) var hasSent bool - + var msgCt int msgMu := new(sync.Mutex) timer := time.NewTicker(time.Duration(bufferMaxMs) * time.Millisecond) - doSend := func() { + doSend := func(msgBufCopy ProduceRequest) { + msgMu.Lock() - var msgBufCopy []*Message - msgBufCopy = msgBuffer - msgBuffer = make([]*Message, 0) + msgCt = 0 + msgBuffer = make(ProduceRequest) msgMu.Unlock() - request := broker.EncodePublishRequest(msgBufCopy...) - _, err := conn.Write(request) + + var err error + if len(msgBufCopy) > 1 { + request := broker.EncodeMultiProduceRequest(&msgBufCopy) + _, err = conn.Write(request) + log.Println(err) + } else { + for _, msgs := range msgBufCopy { + request := broker.EncodeProduceRequest(msgs...) + _, err = conn.Write(request) + } + } + if err != nil { - // ? panic? - log.Println("potentially fatal error?") + // TODO, reconnect? + log.Println("potentially fatal error?", err) } } @@ -119,28 +150,37 @@ func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (Me for _ = range timer.C { msgMu.Lock() - if len(msgBuffer) > 0 && !hasSent { + if msgCt > 0 && !hasSent { + hasSent = false msgMu.Unlock() - doSend() + doSend(msgBuffer) } else { msgMu.Unlock() } - hasSent = false + } }() return func(msg *Message) { if msg == nil { - doSend() + doSend(msgBuffer) return } msgMu.Lock() - msgBuffer = append(msgBuffer, msg) - if len(msgBuffer) >= bufferMaxSize { + msgCt++ + // get a random partition + partId := broker.Partitioner(broker) + log.Println("partid = ", partId) + if msgs, ok := msgBuffer[partId]; !ok { + msgBuffer[partId] = []*Message{msg} + } else { + msgBuffer[partId] = append(msgs, msg) + } + if msgCt > bufferMaxSize { hasSent = true msgMu.Unlock() - go doSend() + go doSend(msgBuffer) } else { msgMu.Unlock() } diff --git a/clients/go/src/request.go b/clients/go/src/request.go index 8fbe4d0..7d42f40 100644 --- a/clients/go/src/request.go +++ b/clients/go/src/request.go @@ -25,6 +25,7 @@ package kafka import ( "bytes" "encoding/binary" + "log" ) type RequestType uint16 @@ -38,16 +39,25 @@ const ( REQUEST_OFFSETS = 4 ) -// Request Header: -func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer { - request := bytes.NewBuffer([]byte{}) +// Request Header: +func (b *Broker) EncodeRequestHeader(request *bytes.Buffer, requestType RequestType) { request.Write(uint32bytes(0)) // placeholder for request size request.Write(uint16bytes(int(requestType))) - request.Write(uint16bytes(len(b.topic))) - request.WriteString(b.topic) - request.Write(uint32bytes(b.partition)) +} - return request +// Topic/Partition Header: +func EncodeTopicHeader(request *bytes.Buffer, topic string, partition int) { + request.Write(uint16bytes(len(topic))) + request.WriteString(topic) + request.Write(uint32bytes(uint32(partition))) +} + +// Topic Fetch Request: +func EncodeTopicFetch(request *bytes.Buffer, tp *TopicPartition) { + EncodeTopicHeader(request, tp.Topic, tp.Partition) + log.Println("topic ", tp.Topic, tp.Partition, tp.Offset) + request.Write(uint64ToUint64bytes(tp.Offset)) + request.Write(uint32bytes(tp.MaxSize)) } // after writing to the buffer is complete, encode the size of the request in the request. @@ -55,34 +65,99 @@ func encodeRequestSize(request *bytes.Buffer) { binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4)) } -// +// func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte { - request := b.EncodeRequestHeader(REQUEST_OFFSETS) - // specific to offset request - request.Write(uint64ToUint64bytes(uint64(time))) - request.Write(uint32toUint32bytes(maxNumOffsets)) + request := bytes.NewBuffer([]byte{}) + b.EncodeRequestHeader(request, REQUEST_OFFSETS) + + if len(b.topics) > 1 { + // TODO: more than one offset if > 1 partition + + } else { + EncodeTopicHeader(request, b.topics[0].Topic, b.Partitioner(b)) + // specific to offset request + request.Write(uint64ToUint64bytes(uint64(time))) + request.Write(uint32toUint32bytes(maxNumOffsets)) + } encodeRequestSize(request) return request.Bytes() } -// -func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { - request := b.EncodeRequestHeader(REQUEST_FETCH) - // specific to consume request - request.Write(uint64ToUint64bytes(offset)) - request.Write(uint32toUint32bytes(maxSize)) +// +// +func (b *Broker) EncodeConsumeRequest() []byte { + request := bytes.NewBuffer([]byte{}) + + //if len(b.topics) != 1 { + // return request.Bytes() + //} + tp := b.topics[0] + + b.EncodeRequestHeader(request, REQUEST_FETCH) + + EncodeTopicFetch(request, tp) encodeRequestSize(request) return request.Bytes() } -// -func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { +// +// +// +// +func (b *Broker) EncodeConsumeRequestMultiFetch() []byte { + + /* + + [0 0 0 48 0 2 0 2 + 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 0 0 0 16 0 0 + 0 4 116 101 115 116 0 0 0 1 0 0 0 0 0 0 0 0 0 16 0 0] + + [0 0 0 48 0 2 0 2 + 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 6 137 0 16 0 0 + 0 4 116 101 115 116 0 0 0 1 0 0 0 0 0 0 3 97 0 16 0 0] + + 0 0 0 48 + 0 2 + 0 2 + -- repeat + 116 101 115 116 + 0 0 0 0 + 0 0 0 0 0 0 0 0 + 0 0 0 0 + */ + + request := bytes.NewBuffer([]byte{}) + b.EncodeRequestHeader(request, REQUEST_MULTIFETCH) + + request.Write(uint16bytes(len(b.topics))) + + for _, tp := range b.topics { + EncodeTopicFetch(request, tp) + } + + encodeRequestSize(request) + log.Println(request.Bytes()) + return request.Bytes() +} + +// +/* +[0 0 0 24 0 1 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 0 0 0 16 0 0] +[0 0 0 24 0 2 0 4 116 101 115 116 0 0 0 0 0 0 0 0 0 0 0 0 0 16 0 0] +*/ +func (b *Broker) EncodeProduceRequest(messages ...*Message) []byte { // 4 + 2 + 2 + topicLength + 4 + 4 - request := b.EncodeRequestHeader(REQUEST_PRODUCE) + request := bytes.NewBuffer([]byte{}) + if len(b.topics) != 1 { + return request.Bytes() + } + + b.EncodeRequestHeader(request, REQUEST_PRODUCE) + EncodeTopicHeader(request, b.topics[0].Topic, b.Partitioner(b)) messageSetSizePos := request.Len() request.Write(uint32bytes(0)) // placeholder message len @@ -95,7 +170,72 @@ func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { // now add the accumulated size of that the message set was binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) + // now add the size of the whole to the first uint32 encodeRequestSize(request) return request.Bytes() } + +// +// +// +// +// +// +// ..... +func (b *Broker) EncodeMultiProduceRequest(preq *ProduceRequest) []byte { + + /* + Format of a Multi-Produce Request + [0 0 0 92 0 3 0 2 + 0 4 116 101 115 116 0 0 0 1 0 0 0 15 + 0 0 0 11 1 0 254 240 190 143 97 97 114 111 110 + 0 4 116 101 115 116 0 0 0 0 0 0 0 45 + 0 0 0 11 1 0 254 240 190 143 97 97 114 111 110 + 0 0 0 11 1 0 254 240 190 143 97 97 114 111 110 + 0 0 0 11 1 0 254 240 190 143 97 97 114 111 110] + + 0 0 0 92 + 0 3 + 0 2 + -- repeat + 0 4 + 116 101 115 116 + 0 0 0 1 + 0 0 0 15 + - repeat + 0 0 0 11 + 1 + 0 + 254 240 190 143 + 97 97 114 111 110 + */ + request := bytes.NewBuffer([]byte{}) + + b.EncodeRequestHeader(request, REQUEST_MULTIPRODUCE) + request.Write(uint16bytes(len(*preq))) + + + for partition, messages := range *preq { + + // TODO, support multiple topics (right now, just multiple partitions for single topic) + EncodeTopicHeader(request, b.topics[0].Topic, partition) + + messageSetSizePos := request.Len() + request.Write(uint32bytes(0)) // placeholder message len + + written := 0 + for _, message := range messages { + wrote, _ := request.Write(message.Encode()) + written += wrote + } + + // now add the accumulated size of that the message set was + binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) + + } + encodeRequestSize(request) + + return request.Bytes() +} + diff --git a/clients/go/tools/consumer/consumer.go b/clients/go/tools/consumer/consumer.go index 4cf134c..25372e2 100644 --- a/clients/go/tools/consumer/consumer.go +++ b/clients/go/tools/consumer/consumer.go @@ -25,8 +25,10 @@ package main import ( "flag" "fmt" + "log" "os" "os/signal" + "strings" "strconv" kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "syscall" @@ -34,7 +36,7 @@ import ( var hostname string var topic string -var partition int +var partitionstr string var offset uint64 var maxSize uint var writePayloadsTo string @@ -44,21 +46,34 @@ var printmessage bool func init() { flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") flag.StringVar(&topic, "topic", "test", "topic to publish to") - flag.IntVar(&partition, "partition", 0, "partition to publish to") + flag.StringVar(&partitionstr, "partitions", "0", "partitions to publish to: comma delimited") flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes to consume a message set") flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") - flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") - flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") + flag.BoolVar(&consumerForever, "consumeforever", true, "loop forever consuming") + flag.BoolVar(&printmessage, "print", true, "print the message details to stdout") + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime|log.Lshortfile) } func main() { flag.Parse() fmt.Println("Consuming Messages :") - fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Printf("From: %s, topic: %s, partitions: %s\n", hostname, topic, partitionstr) + log.Println("pringing ?", printmessage) + //panic("wt") fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) + var broker *kafka.BrokerConsumer + parts := strings.Split(partitionstr,",") + if len(parts) > 1 { + tps := kafka.NewTopicPartitions(topic, partitionstr, offset, uint32(maxSize)) + broker = kafka.NewMultiConsumer(hostname, tps) + } else { + partition, _ := strconv.Atoi(partitionstr) + broker = kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) + } + var payloadFile *os.File = nil var msgCt int if len(writePayloadsTo) > 0 { @@ -69,8 +84,8 @@ func main() { payloadFile = nil } } - - consumerCallback := func(msg *kafka.Message) { + var consumerCallback kafka.MessageHandlerFunc + consumerCallback = func(topic string, partition int, msg *kafka.Message) { msgCt++ if printmessage { msg.Print() @@ -78,6 +93,9 @@ func main() { fmt.Printf("Cur Offset: %d\n", msg.Offset()+msg.TotalLen()) msgCt = 0 } + if msgCt == 10 { + //panic("out") + } if payloadFile != nil { payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) payloadFile.Write(msg.Payload()) @@ -87,13 +105,18 @@ func main() { if consumerForever { quit := make(chan bool, 1) + msgChan := make(chan *kafka.Message) go func() { - sigIn := make(chan os.Signal) + var sigIn chan os.Signal = make(chan os.Signal,1) signal.Notify(sigIn) for { select { + case <-quit: + log.Println("got quit, shutting down") + close(msgChan) + return case sig := <-sigIn: if sig.(os.Signal) == syscall.SIGINT { quit <- true @@ -104,11 +127,11 @@ func main() { } }() - msgChan := make(chan *kafka.Message) - go broker.ConsumeOnChannel(msgChan, 10, quit) + + go broker.ConsumeOnChannel(msgChan, 200, quit) for msg := range msgChan { if msg != nil { - consumerCallback(msg) + consumerCallback(topic, 0, msg) } else { break } diff --git a/clients/go/tools/publisher/publisher.go b/clients/go/tools/publisher/publisher.go index 2ccd22d..4acd885 100644 --- a/clients/go/tools/publisher/publisher.go +++ b/clients/go/tools/publisher/publisher.go @@ -27,11 +27,12 @@ import ( "flag" "fmt" kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" + "log" "os" ) /* - This publisher tool has 4 send modes: + This publisher tool has 5 send modes: 1. Pass message: ./publisher -message="good stuff bob" -hostname=192.168.1.15:9092 @@ -46,6 +47,11 @@ import ( ./publisher -topic=atopic -partition=0 >my message here + 5. MultiProduce Stdin: if message, message file empty it accepts + messages from Console (message end at new line) + ./publisher -topic=atopic -partition=0 -multi + >my message here + each message is sent 4 times, to get more than one partition */ var hostname string var topic string @@ -54,6 +60,7 @@ var sendCt int var message string var messageFile string var compress bool +var multi bool func init() { flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") @@ -63,11 +70,16 @@ func init() { flag.IntVar(&sendCt, "sendct", 0, "to do a pseudo load test, set sendct & pass a message ") flag.StringVar(&messageFile, "messagefile", "", "read message from this file") flag.BoolVar(&compress, "compress", false, "compress the messages published") + flag.BoolVar(&multi, "multi", false, "send multiple messages (multiproduce)?") + log.SetOutput(os.Stdout) + log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile) } -// sends +// sends file & exits func SendFile(msgFile string) { + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + fmt.Println("Publishing File:", msgFile) file, err := os.Open(msgFile) if err != nil { @@ -93,18 +105,23 @@ func SendFile(msgFile string) { file.Close() } +func MakeMsg(message []byte) *kafka.Message { + if compress { + return kafka.NewCompressedMessage(message) + } + return kafka.NewMessage(message) +} + +// sends single message and exits func SendMessage() { broker := kafka.NewBrokerPublisher(hostname, topic, partition) fmt.Println("Publishing :", message) - if compress { - broker.Publish(kafka.NewCompressedMessage([]byte(message))) - } else { - broker.Publish(kafka.NewMessage([]byte(message))) - } + broker.Publish(MakeMsg([]byte(message))) } +// sends x copies of a message func SendManyMessages() { broker := kafka.NewBrokerPublisher(hostname, topic, partition) @@ -116,14 +133,66 @@ func SendManyMessages() { go broker.PublishOnChannel(msgChan, 100, 100, done) for i := 0; i < sendCt; i++ { - msgChan <- kafka.NewMessage([]byte(message)) + msgChan <- MakeMsg([]byte(message)) } done <- true // force flush timing.Print() } +// sends messages from stdin +func StdinPruducer() { + + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + b := bufio.NewReader(os.Stdin) + done := make(chan bool) + msgChan := make(chan *kafka.Message, 1000) + + go broker.PublishOnChannel(msgChan, 2000, 200, done) + fmt.Println("reading from stdin") + for { + if s, e := b.ReadString('\n'); e == nil { + + fmt.Println("sending ---", s, []byte(s)) + + msgChan <- MakeMsg([]byte(s)[:len(s)-1]) + + } + } +} + +// console producer, this sends 4 messages to the producer, +// so that hopefully the random partition sends at least one to each partition +func StdinMultiProduce() { + + broker := kafka.NewPartitionedProducer(hostname, topic, []int{partition, partition + 1}) + b := bufio.NewReader(os.Stdin) + done := make(chan bool) + msgChan := make(chan *kafka.Message, 1000) + + go broker.PublishOnChannel(msgChan, 2000, 200, done) + + fmt.Println("reading from stdin") + + for { + + if s, e := b.ReadString('\n'); e == nil { + by := []byte(s)[:len(s)-1] + fmt.Println("sending ---", s, by) + // hope we get at least 1 of each random partition + msgChan <- MakeMsg(by) + msgChan <- MakeMsg(by) + msgChan <- MakeMsg(by) + msgChan <- MakeMsg(by) + + } + + } +} + + func main() { + flag.Parse() fmt.Printf("Kafka: %s, topic: %s, partition: %d\n", hostname, topic, partition) fmt.Println(" ---------------------- ") @@ -140,24 +209,13 @@ func main() { SendManyMessages() - } else { - - // console publisher - broker := kafka.NewBrokerPublisher(hostname, topic, partition) - b := bufio.NewReader(os.Stdin) - done := make(chan bool) - msgChan := make(chan *kafka.Message, 1000) + } else if multi { - go broker.PublishOnChannel(msgChan, 2000, 200, done) - fmt.Println("reading from stdin") - for { - if s, e := b.ReadString('\n'); e == nil { + StdinMultiProduce() - fmt.Println("sending ---", s) + } else { - msgChan <- kafka.NewMessage([]byte(s)) + StdinPruducer() - } - } } } -- 1.7.4.4