From 4e05ac92a9c885a777a64f5a34cb3ca6113d0caa Mon Sep 17 00:00:00 2001 From: Aaron Raddon Date: Tue, 13 Mar 2012 18:49:01 -0700 Subject: [PATCH] kafka297 --- clients/go/Makefile | 2 +- clients/go/src/publisher.go | 99 ++++++++++++++++++++++ clients/go/tools/publisher/publisher.go | 136 ++++++++++++++++++++++++------- 3 files changed, 205 insertions(+), 32 deletions(-) diff --git a/clients/go/Makefile b/clients/go/Makefile index 3d7da05..b7da4f3 100644 --- a/clients/go/Makefile +++ b/clients/go/Makefile @@ -19,7 +19,7 @@ tools: force make -C tools/offsets clean all format: - gofmt -w -tabwidth=2 -tabindent=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go + gofmt -w -tabwidth=2 -tabs=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go full: format clean install tools diff --git a/clients/go/src/publisher.go b/clients/go/src/publisher.go index 0766d1a..fab6c43 100644 --- a/clients/go/src/publisher.go +++ b/clients/go/src/publisher.go @@ -22,6 +22,15 @@ package kafka +import ( + "log" + "net" + "sync" + "time" +) + +type MessageSender func(*Message) + type BrokerPublisher struct { broker *Broker } @@ -49,3 +58,93 @@ func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { return num, err } + +// opens a channel for publishing, blocking call +func (b *BrokerPublisher) PublishOnChannel(msgChan chan *Message, bufferMaxMs int64, bufferMaxSize int, quit chan bool) error { + + sender, conn, err := NewBufferedSender(b.broker, bufferMaxMs, bufferMaxSize) + + if err == nil { + + // wait for stop signal + go func() { + <-quit + sender(nil) // flush + conn.Close() + close(msgChan) + }() + + for msg := range msgChan { + if msg != nil { + sender(msg) + } + } + + } + return err + +} + +// Buffered Sender, buffers messages for max time, and max size +func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (MessageSender, *net.TCPConn, error) { + + conn, err := broker.connect() + if err != nil { + return nil, nil, err + } + + msgBuffer := make([]*Message, 0) + + var hasSent bool + + msgMu := new(sync.Mutex) + timer := time.NewTicker(time.Duration(bufferMaxMs) * time.Millisecond) + + doSend := func() { + msgMu.Lock() + var msgBufCopy []*Message + msgBufCopy = msgBuffer + msgBuffer = make([]*Message, 0) + msgMu.Unlock() + request := broker.EncodePublishRequest(msgBufCopy...) + _, err := conn.Write(request) + if err != nil { + // ? panic? + log.Println("potentially fatal error?") + } + } + + log.Println("start buffered sender heartbeat = ", bufferMaxMs, " max queue ", bufferMaxSize) + go func() { + + for _ = range timer.C { + msgMu.Lock() + if len(msgBuffer) > 0 && !hasSent { + hasSent = false + msgMu.Unlock() + doSend() + } else { + msgMu.Unlock() + } + + } + + }() + + return func(msg *Message) { + if msg == nil { + doSend() + return + } + msgMu.Lock() + msgBuffer = append(msgBuffer, msg) + if len(msgBuffer) >= bufferMaxSize { + hasSent = true + msgMu.Unlock() + go doSend() + } else { + msgMu.Unlock() + } + + }, conn, nil +} diff --git a/clients/go/tools/publisher/publisher.go b/clients/go/tools/publisher/publisher.go index f98c9b2..2ccd22d 100644 --- a/clients/go/tools/publisher/publisher.go +++ b/clients/go/tools/publisher/publisher.go @@ -23,15 +23,34 @@ package main import ( + "bufio" "flag" "fmt" kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "os" ) +/* + This publisher tool has 4 send modes: + 1. Pass message: + ./publisher -message="good stuff bob" -hostname=192.168.1.15:9092 + + 2. Pass Msg, SendCT: Send the samge message SendCt # of times + ./publisher -sendct=100 -message="good stuff bob" + + 3. MessageFile: pass a message file and it will read + ./publisher -messagefile=/tmp/msgs.log + + 4. Stdin: if message, message file empty it accepts + messages from Console (message end at new line) + ./publisher -topic=atopic -partition=0 + >my message here + +*/ var hostname string var topic string var partition int +var sendCt int var message string var messageFile string var compress bool @@ -41,49 +60,104 @@ func init() { flag.StringVar(&topic, "topic", "test", "topic to publish to") flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.StringVar(&message, "message", "", "message to publish") + flag.IntVar(&sendCt, "sendct", 0, "to do a pseudo load test, set sendct & pass a message ") flag.StringVar(&messageFile, "messagefile", "", "read message from this file") flag.BoolVar(&compress, "compress", false, "compress the messages published") } +// sends +func SendFile(msgFile string) { + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + fmt.Println("Publishing File:", msgFile) + file, err := os.Open(msgFile) + if err != nil { + fmt.Println("Error: ", err) + return + } + stat, err := file.Stat() + if err != nil { + fmt.Println("Error: ", err) + return + } + payload := make([]byte, stat.Size()) + file.Read(payload) + timing := kafka.StartTiming("Sending") + + if compress { + broker.Publish(kafka.NewCompressedMessage(payload)) + } else { + broker.Publish(kafka.NewMessage(payload)) + } + + timing.Print() + file.Close() +} + +func SendMessage() { + + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + + fmt.Println("Publishing :", message) + if compress { + broker.Publish(kafka.NewCompressedMessage([]byte(message))) + } else { + broker.Publish(kafka.NewMessage([]byte(message))) + } +} + +func SendManyMessages() { + + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + timing := kafka.StartTiming("Sending") + + fmt.Println("Publishing :", message, ": Will send ", sendCt, " times") + done := make(chan bool) + msgChan := make(chan *kafka.Message, 1000) + + go broker.PublishOnChannel(msgChan, 100, 100, done) + for i := 0; i < sendCt; i++ { + msgChan <- kafka.NewMessage([]byte(message)) + } + done <- true // force flush + + timing.Print() +} + func main() { flag.Parse() - fmt.Println("Publishing :", message) - fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Printf("Kafka: %s, topic: %s, partition: %d\n", hostname, topic, partition) fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerPublisher(hostname, topic, partition) if len(message) == 0 && len(messageFile) != 0 { - file, err := os.Open(messageFile) - if err != nil { - fmt.Println("Error: ", err) - return - } - stat, err := file.Stat() - if err != nil { - fmt.Println("Error: ", err) - return - } - payload := make([]byte, stat.Size()) - file.Read(payload) - timing := kafka.StartTiming("Sending") - - if compress { - broker.Publish(kafka.NewCompressedMessage(payload)) - } else { - broker.Publish(kafka.NewMessage(payload)) - } - timing.Print() - file.Close() + SendFile(messageFile) + + } else if len(message) > 0 && sendCt == 0 { + + SendMessage() + + } else if len(message) > 0 && sendCt > 0 { + + SendManyMessages() + } else { - timing := kafka.StartTiming("Sending") - if compress { - broker.Publish(kafka.NewCompressedMessage([]byte(message))) - } else { - broker.Publish(kafka.NewMessage([]byte(message))) - } + // console publisher + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + b := bufio.NewReader(os.Stdin) + done := make(chan bool) + msgChan := make(chan *kafka.Message, 1000) + + go broker.PublishOnChannel(msgChan, 2000, 200, done) + fmt.Println("reading from stdin") + for { + if s, e := b.ReadString('\n'); e == nil { + + fmt.Println("sending ---", s) - timing.Print() + msgChan <- kafka.NewMessage([]byte(s)) + + } + } } } -- 1.7.4.4