Index: clients/go/src/publisher.go =================================================================== --- clients/go/src/publisher.go (revision 1298586) +++ clients/go/src/publisher.go (working copy) @@ -22,6 +22,15 @@ package kafka +import ( + "log" + "net" + "sync" + "time" +) + +type MessageSender func(*Message) + type BrokerPublisher struct { broker *Broker } @@ -49,3 +58,93 @@ func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { return num, err } + +// opens a channel for publishing, blocking call +func (b *BrokerPublisher) PublishOnChannel(msgChan chan *Message, bufferMaxMs int64, bufferMaxSize int, quit chan bool) error { + + sender, conn, err := NewBufferedSender(b.broker, bufferMaxMs, bufferMaxSize) + + if err == nil { + + // wait for stop signal + go func() { + <-quit + sender(nil) // flush + conn.Close() + close(msgChan) + }() + + for msg := range msgChan { + if msg != nil { + sender(msg) + } + } + + } + return err + +} + +// Buffered Sender, buffers messages for max time, and max size +func NewBufferedSender(broker *Broker, bufferMaxMs int64, bufferMaxSize int) (MessageSender, *net.TCPConn, error) { + + conn, err := broker.connect() + if err != nil { + return nil, nil, err + } + + msgBuffer := make([]*Message, 0) + + var hasSent bool + + msgMu := new(sync.Mutex) + timer := time.NewTicker(time.Duration(bufferMaxMs) * time.Millisecond) + + doSend := func() { + msgMu.Lock() + var msgBufCopy []*Message + msgBufCopy = msgBuffer + msgBuffer = make([]*Message, 0) + msgMu.Unlock() + log.Println("sending msgs, ", len(msgBufCopy)) + request := broker.EncodePublishRequest(msgBufCopy...) + _, err := conn.Write(request) + if err != nil { + // ? panic? + log.Println("potentially fatal error?") + } + } + + log.Println("start buffered sender heartbeat = ", bufferMaxMs, " max queue ", bufferMaxSize) + go func() { + + for _ = range timer.C { + msgMu.Lock() + if len(msgBuffer) > 0 && !hasSent { + msgMu.Unlock() + doSend() + } else { + msgMu.Unlock() + } + hasSent = false + } + + }() + + return func(msg *Message) { + if msg == nil { + doSend() + return + } + msgMu.Lock() + msgBuffer = append(msgBuffer, msg) + if len(msgBuffer) >= bufferMaxSize { + hasSent = true + msgMu.Unlock() + go doSend() + } else { + msgMu.Unlock() + } + + }, conn, nil +}