Index: clients/go/tools/offsets/Makefile =================================================================== --- clients/go/tools/offsets/Makefile (revision 1299800) +++ clients/go/tools/offsets/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=offsets -GOFILES=\ - offsets.go\ - -include $(GOROOT)/src/Make.cmd Index: clients/go/tools/offsets/offsets.go =================================================================== --- clients/go/tools/offsets/offsets.go (revision 1299800) +++ clients/go/tools/offsets/offsets.go (working copy) @@ -20,13 +20,12 @@ * of their respective owners. */ - package main import ( - "kafka" "flag" "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" ) var hostname string @@ -43,7 +42,6 @@ flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)") } - func main() { flag.Parse() fmt.Println("Offsets :") @@ -56,7 +54,7 @@ fmt.Println("Error: ", err) } fmt.Printf("Offsets found: %d\n", len(offsets)) - for i := 0 ; i < len(offsets); i++ { + for i := 0; i < len(offsets); i++ { fmt.Printf("Offset[%d] = %d\n", i, offsets[i]) } } Index: clients/go/tools/consumer/consumer.go =================================================================== --- clients/go/tools/consumer/consumer.go (revision 1299800) +++ clients/go/tools/consumer/consumer.go (working copy) @@ -23,12 +23,12 @@ package main import ( - "kafka" "flag" "fmt" "os" + "os/signal" "strconv" - "os/signal" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "syscall" ) @@ -46,7 +46,7 @@ flag.StringVar(&topic, "topic", "test", "topic to publish to") flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") - flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from") + flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request") flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") @@ -61,7 +61,7 @@ var payloadFile *os.File = nil if len(writePayloadsTo) > 0 { - var err os.Error + var err error payloadFile, err = os.Create(writePayloadsTo) if err != nil { fmt.Println("Error opening file: ", err) @@ -74,7 +74,7 @@ msg.Print() } if payloadFile != nil { - payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n")) + payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) payloadFile.Write(msg.Payload()) payloadFile.Write([]byte("\n-------------------------------\n")) } @@ -83,10 +83,17 @@ if consumerForever { quit := make(chan bool, 1) go func() { + var sigIn chan os.Signal + signal.Notify(sigIn) for { - sig := <-signal.Incoming - if sig.(os.UnixSignal) == syscall.SIGINT { - quit <- true + + select { + case sig := <-sigIn: + if sig.(os.Signal) == syscall.SIGINT { + quit <- true + } else { + fmt.Println(sig) + } } } }() Index: clients/go/tools/consumer/Makefile =================================================================== --- clients/go/tools/consumer/Makefile (revision 1299800) +++ clients/go/tools/consumer/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=consumer -GOFILES=\ - consumer.go\ - -include $(GOROOT)/src/Make.cmd Index: clients/go/tools/publisher/publisher.go =================================================================== --- clients/go/tools/publisher/publisher.go (revision 1299800) +++ clients/go/tools/publisher/publisher.go (working copy) @@ -23,9 +23,9 @@ package main import ( - "kafka" "flag" "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "os" ) @@ -63,7 +63,7 @@ fmt.Println("Error: ", err) return } - payload := make([]byte, stat.Size) + payload := make([]byte, stat.Size()) file.Read(payload) timing := kafka.StartTiming("Sending") Index: clients/go/tools/publisher/Makefile =================================================================== --- clients/go/tools/publisher/Makefile (revision 1299800) +++ clients/go/tools/publisher/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=publisher -GOFILES=\ - publisher.go\ - -include $(GOROOT)/src/Make.cmd Index: clients/go/src/consumer.go =================================================================== --- clients/go/src/consumer.go (revision 1299800) +++ clients/go/src/consumer.go (working copy) @@ -23,11 +23,12 @@ package kafka import ( + "encoding/binary" + "errors" + "io" "log" - "os" "net" "time" - "encoding/binary" ) type BrokerConsumer struct { @@ -66,11 +67,11 @@ func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { // merge to the default map, so one 'could' override the default codecs.. for k, v := range codecsMap(payloadCodecs) { - consumer.codecs[k] = v, true + consumer.codecs[k] = v } } -func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) { +func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) { conn, err := consumer.broker.connect() if err != nil { return -1, err @@ -86,14 +87,14 @@ }) if err != nil { - if err != os.EOF { + if err != io.EOF { log.Println("Fatal Error: ", err) panic(err) } quit <- true // force quit break } - time.Sleep(pollTimeoutMs * 1000000) + time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs) * 1000000) } done <- true }() @@ -107,7 +108,7 @@ type MessageHandlerFunc func(msg *Message) -func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) { +func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) { conn, err := consumer.broker.connect() if err != nil { return -1, err @@ -123,7 +124,7 @@ return num, err } -func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) { +func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) if err != nil { return -1, err @@ -142,7 +143,7 @@ for currentOffset <= uint64(length-4) { totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) if msgs == nil { - return num, os.NewError("Error Decoding Message") + return num, errors.New("Error Decoding Message") } msgOffset := consumer.offset + currentOffset for _, msg := range msgs { @@ -164,7 +165,7 @@ // Get a list of valid offsets (up to maxNumOffsets) before the given time, where // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) // The result is a list of offsets, in descending order. -func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) { +func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) { offsets := make([]uint64, 0) conn, err := consumer.broker.connect() Index: clients/go/src/kafka.go =================================================================== --- clients/go/src/kafka.go (revision 1299800) +++ clients/go/src/kafka.go (working copy) @@ -23,13 +23,13 @@ package kafka import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" "log" "net" - "os" - "fmt" - "encoding/binary" - "io" - "bufio" ) const ( @@ -48,7 +48,7 @@ hostname: hostname} } -func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { +func (b *Broker) connect() (conn *net.TCPConn, error error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { log.Println("Fatal Error: ", err) @@ -63,7 +63,7 @@ } // returns length of response & payload & err -func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { +func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { reader := bufio.NewReader(conn) length := make([]byte, 4) lenRead, err := io.ReadFull(reader, length) @@ -71,7 +71,7 @@ return 0, []byte{}, err } if lenRead != 4 || lenRead < 0 { - return 0, []byte{}, os.NewError("invalid length of the packet length field") + return 0, []byte{}, errors.New("invalid length of the packet length field") } expectedLength := binary.BigEndian.Uint32(length) @@ -82,13 +82,13 @@ } if uint32(lenRead) != expectedLength { - return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) + return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) } errorCode := binary.BigEndian.Uint16(messages[0:2]) if errorCode != 0 { log.Println("errorCode: ", errorCode) - return 0, []byte{}, os.NewError( + return 0, []byte{}, errors.New( fmt.Sprintf("Broker Response Error: %d", errorCode)) } return expectedLength, messages[2:], nil Index: clients/go/src/publisher.go =================================================================== --- clients/go/src/publisher.go (revision 1299800) +++ clients/go/src/publisher.go (working copy) @@ -22,10 +22,6 @@ package kafka -import ( - "os" -) - type BrokerPublisher struct { broker *Broker } @@ -34,11 +30,11 @@ return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} } -func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) { +func (b *BrokerPublisher) Publish(message *Message) (int, error) { return b.BatchPublish(message) } -func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) { +func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { conn, err := b.broker.connect() if err != nil { return -1, err Index: clients/go/src/message.go =================================================================== --- clients/go/src/message.go (revision 1299800) +++ clients/go/src/message.go (working copy) @@ -23,9 +23,9 @@ package kafka import ( + "bytes" + "encoding/binary" "hash/crc32" - "encoding/binary" - "bytes" "log" ) Index: clients/go/src/payload_codec.go =================================================================== --- clients/go/src/payload_codec.go (revision 1299800) +++ clients/go/src/payload_codec.go (working copy) @@ -57,7 +57,7 @@ func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) for _, c := range payloadCodecs { - payloadCodecsMap[c.Id()] = c, true + payloadCodecsMap[c.Id()] = c } return payloadCodecsMap } @@ -65,7 +65,6 @@ // No compression codec, noop type NoCompressionPayloadCodec struct { - } func (codec *NoCompressionPayloadCodec) Id() byte { @@ -83,7 +82,6 @@ // Gzip Codec type GzipPayloadCodec struct { - } func (codec *GzipPayloadCodec) Id() byte { Index: clients/go/src/timing.go =================================================================== --- clients/go/src/timing.go (revision 1299800) +++ clients/go/src/timing.go (working copy) @@ -34,16 +34,16 @@ } func StartTiming(label string) *Timing { - return &Timing{label: label, start: time.Nanoseconds(), stop: 0} + return &Timing{label: label, start: time.Now().UnixNano()} } func (t *Timing) Stop() { - t.stop = time.Nanoseconds() + t.stop = time.Now().UnixNano() } func (t *Timing) Print() { if t.stop == 0 { t.Stop() } - log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000) + log.Printf("%s took: %f ms\n", t.label, float64(t.stop - t.start)/1000000) } Index: clients/go/src/request.go =================================================================== --- clients/go/src/request.go (revision 1299800) +++ clients/go/src/request.go (working copy) @@ -23,8 +23,8 @@ package kafka import ( + "bytes" "encoding/binary" - "bytes" ) type RequestType uint16 Index: clients/go/README.md =================================================================== --- clients/go/README.md (revision 1299800) +++ clients/go/README.md (working copy) @@ -12,10 +12,18 @@ Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). Install kafka.go package:
-make install +go get http://svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src
Make the tools (publisher & consumer)
-make tools +go get svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/tools/consumer +go get svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/tools/publisher + +Install kafka from source, in a goroot/src (see go documentation):
+mkdir svn.apache.org/repos/asf/incubator/kafka.svn +cd svn.apache.org/repos/asf/incubator/kafka.svn +svn co http://svn.apache.org/repos/asf/incubator/kafka/trunk trunk + +
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html Index: clients/go/Makefile =================================================================== --- clients/go/Makefile (revision 1299800) +++ clients/go/Makefile (working copy) @@ -19,7 +19,7 @@ make -C tools/offsets clean all format: - gofmt -w -tabwidth=2 -tabindent=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go + gofmt -w -tabwidth=2 -tabs=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go full: format clean install tools