Index: clients/go/tools/offsets/Makefile
===================================================================
--- clients/go/tools/offsets/Makefile (revision 1299800)
+++ clients/go/tools/offsets/Makefile (working copy)
@@ -1,7 +0,0 @@
-include $(GOROOT)/src/Make.inc
-
-TARG=offsets
-GOFILES=\
- offsets.go\
-
-include $(GOROOT)/src/Make.cmd
Index: clients/go/tools/offsets/offsets.go
===================================================================
--- clients/go/tools/offsets/offsets.go (revision 1299800)
+++ clients/go/tools/offsets/offsets.go (working copy)
@@ -20,13 +20,12 @@
* of their respective owners.
*/
-
package main
import (
- "kafka"
"flag"
"fmt"
+ kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
)
var hostname string
@@ -43,7 +42,6 @@
flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)")
}
-
func main() {
flag.Parse()
fmt.Println("Offsets :")
@@ -56,7 +54,7 @@
fmt.Println("Error: ", err)
}
fmt.Printf("Offsets found: %d\n", len(offsets))
- for i := 0 ; i < len(offsets); i++ {
+ for i := 0; i < len(offsets); i++ {
fmt.Printf("Offset[%d] = %d\n", i, offsets[i])
}
}
Index: clients/go/tools/consumer/consumer.go
===================================================================
--- clients/go/tools/consumer/consumer.go (revision 1299800)
+++ clients/go/tools/consumer/consumer.go (working copy)
@@ -23,12 +23,12 @@
package main
import (
- "kafka"
"flag"
"fmt"
"os"
+ "os/signal"
"strconv"
- "os/signal"
+ kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
"syscall"
)
@@ -46,7 +46,7 @@
flag.StringVar(&topic, "topic", "test", "topic to publish to")
flag.IntVar(&partition, "partition", 0, "partition to publish to")
flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from")
- flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from")
+ flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request")
flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file")
flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming")
flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
@@ -61,7 +61,7 @@
var payloadFile *os.File = nil
if len(writePayloadsTo) > 0 {
- var err os.Error
+ var err error
payloadFile, err = os.Create(writePayloadsTo)
if err != nil {
fmt.Println("Error opening file: ", err)
@@ -74,7 +74,7 @@
msg.Print()
}
if payloadFile != nil {
- payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n"))
+ payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n"))
payloadFile.Write(msg.Payload())
payloadFile.Write([]byte("\n-------------------------------\n"))
}
@@ -83,10 +83,17 @@
if consumerForever {
quit := make(chan bool, 1)
go func() {
+ var sigIn chan os.Signal
+ signal.Notify(sigIn)
for {
- sig := <-signal.Incoming
- if sig.(os.UnixSignal) == syscall.SIGINT {
- quit <- true
+
+ select {
+ case sig := <-sigIn:
+ if sig.(os.Signal) == syscall.SIGINT {
+ quit <- true
+ } else {
+ fmt.Println(sig)
+ }
}
}
}()
Index: clients/go/tools/consumer/Makefile
===================================================================
--- clients/go/tools/consumer/Makefile (revision 1299800)
+++ clients/go/tools/consumer/Makefile (working copy)
@@ -1,7 +0,0 @@
-include $(GOROOT)/src/Make.inc
-
-TARG=consumer
-GOFILES=\
- consumer.go\
-
-include $(GOROOT)/src/Make.cmd
Index: clients/go/tools/publisher/publisher.go
===================================================================
--- clients/go/tools/publisher/publisher.go (revision 1299800)
+++ clients/go/tools/publisher/publisher.go (working copy)
@@ -23,9 +23,9 @@
package main
import (
- "kafka"
"flag"
"fmt"
+ kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
"os"
)
@@ -63,7 +63,7 @@
fmt.Println("Error: ", err)
return
}
- payload := make([]byte, stat.Size)
+ payload := make([]byte, stat.Size())
file.Read(payload)
timing := kafka.StartTiming("Sending")
Index: clients/go/tools/publisher/Makefile
===================================================================
--- clients/go/tools/publisher/Makefile (revision 1299800)
+++ clients/go/tools/publisher/Makefile (working copy)
@@ -1,7 +0,0 @@
-include $(GOROOT)/src/Make.inc
-
-TARG=publisher
-GOFILES=\
- publisher.go\
-
-include $(GOROOT)/src/Make.cmd
Index: clients/go/src/consumer.go
===================================================================
--- clients/go/src/consumer.go (revision 1299800)
+++ clients/go/src/consumer.go (working copy)
@@ -23,11 +23,12 @@
package kafka
import (
+ "encoding/binary"
+ "errors"
+ "io"
"log"
- "os"
"net"
"time"
- "encoding/binary"
)
type BrokerConsumer struct {
@@ -66,11 +67,11 @@
func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
// merge to the default map, so one 'could' override the default codecs..
for k, v := range codecsMap(payloadCodecs) {
- consumer.codecs[k] = v, true
+ consumer.codecs[k] = v
}
}
-func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
+func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) {
conn, err := consumer.broker.connect()
if err != nil {
return -1, err
@@ -86,14 +87,14 @@
})
if err != nil {
- if err != os.EOF {
+ if err != io.EOF {
log.Println("Fatal Error: ", err)
panic(err)
}
quit <- true // force quit
break
}
- time.Sleep(pollTimeoutMs * 1000000)
+ time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs) * 1000000)
}
done <- true
}()
@@ -107,7 +108,7 @@
type MessageHandlerFunc func(msg *Message)
-func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) {
+func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) {
conn, err := consumer.broker.connect()
if err != nil {
return -1, err
@@ -123,7 +124,7 @@
return num, err
}
-func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
+func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) {
_, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
if err != nil {
return -1, err
@@ -142,7 +143,7 @@
for currentOffset <= uint64(length-4) {
totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
if msgs == nil {
- return num, os.NewError("Error Decoding Message")
+ return num, errors.New("Error Decoding Message")
}
msgOffset := consumer.offset + currentOffset
for _, msg := range msgs {
@@ -164,7 +165,7 @@
// Get a list of valid offsets (up to maxNumOffsets) before the given time, where
// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
// The result is a list of offsets, in descending order.
-func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) {
+func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) {
offsets := make([]uint64, 0)
conn, err := consumer.broker.connect()
Index: clients/go/src/kafka.go
===================================================================
--- clients/go/src/kafka.go (revision 1299800)
+++ clients/go/src/kafka.go (working copy)
@@ -23,13 +23,13 @@
package kafka
import (
+ "bufio"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
"log"
"net"
- "os"
- "fmt"
- "encoding/binary"
- "io"
- "bufio"
)
const (
@@ -48,7 +48,7 @@
hostname: hostname}
}
-func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
+func (b *Broker) connect() (conn *net.TCPConn, error error) {
raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
if err != nil {
log.Println("Fatal Error: ", err)
@@ -63,7 +63,7 @@
}
// returns length of response & payload & err
-func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
+func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
reader := bufio.NewReader(conn)
length := make([]byte, 4)
lenRead, err := io.ReadFull(reader, length)
@@ -71,7 +71,7 @@
return 0, []byte{}, err
}
if lenRead != 4 || lenRead < 0 {
- return 0, []byte{}, os.NewError("invalid length of the packet length field")
+ return 0, []byte{}, errors.New("invalid length of the packet length field")
}
expectedLength := binary.BigEndian.Uint32(length)
@@ -82,13 +82,13 @@
}
if uint32(lenRead) != expectedLength {
- return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength))
+ return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength))
}
errorCode := binary.BigEndian.Uint16(messages[0:2])
if errorCode != 0 {
log.Println("errorCode: ", errorCode)
- return 0, []byte{}, os.NewError(
+ return 0, []byte{}, errors.New(
fmt.Sprintf("Broker Response Error: %d", errorCode))
}
return expectedLength, messages[2:], nil
Index: clients/go/src/publisher.go
===================================================================
--- clients/go/src/publisher.go (revision 1299800)
+++ clients/go/src/publisher.go (working copy)
@@ -22,10 +22,6 @@
package kafka
-import (
- "os"
-)
-
type BrokerPublisher struct {
broker *Broker
}
@@ -34,11 +30,11 @@
return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
}
-func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
+func (b *BrokerPublisher) Publish(message *Message) (int, error) {
return b.BatchPublish(message)
}
-func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
+func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) {
conn, err := b.broker.connect()
if err != nil {
return -1, err
Index: clients/go/src/message.go
===================================================================
--- clients/go/src/message.go (revision 1299800)
+++ clients/go/src/message.go (working copy)
@@ -23,9 +23,9 @@
package kafka
import (
+ "bytes"
+ "encoding/binary"
"hash/crc32"
- "encoding/binary"
- "bytes"
"log"
)
Index: clients/go/src/payload_codec.go
===================================================================
--- clients/go/src/payload_codec.go (revision 1299800)
+++ clients/go/src/payload_codec.go (working copy)
@@ -57,7 +57,7 @@
func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
for _, c := range payloadCodecs {
- payloadCodecsMap[c.Id()] = c, true
+ payloadCodecsMap[c.Id()] = c
}
return payloadCodecsMap
}
@@ -65,7 +65,6 @@
// No compression codec, noop
type NoCompressionPayloadCodec struct {
-
}
func (codec *NoCompressionPayloadCodec) Id() byte {
@@ -83,7 +82,6 @@
// Gzip Codec
type GzipPayloadCodec struct {
-
}
func (codec *GzipPayloadCodec) Id() byte {
Index: clients/go/src/timing.go
===================================================================
--- clients/go/src/timing.go (revision 1299800)
+++ clients/go/src/timing.go (working copy)
@@ -34,16 +34,16 @@
}
func StartTiming(label string) *Timing {
- return &Timing{label: label, start: time.Nanoseconds(), stop: 0}
+ return &Timing{label: label, start: time.Now().UnixNano()}
}
func (t *Timing) Stop() {
- t.stop = time.Nanoseconds()
+ t.stop = time.Now().UnixNano()
}
func (t *Timing) Print() {
if t.stop == 0 {
t.Stop()
}
- log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000)
+ log.Printf("%s took: %f ms\n", t.label, float64(t.stop - t.start)/1000000)
}
Index: clients/go/src/request.go
===================================================================
--- clients/go/src/request.go (revision 1299800)
+++ clients/go/src/request.go (working copy)
@@ -23,8 +23,8 @@
package kafka
import (
+ "bytes"
"encoding/binary"
- "bytes"
)
type RequestType uint16
Index: clients/go/README.md
===================================================================
--- clients/go/README.md (revision 1299800)
+++ clients/go/README.md (working copy)
@@ -12,10 +12,18 @@
Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment).
Install kafka.go package:
-make install
+go get http://svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src
Make the tools (publisher & consumer)
-make tools
+go get svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/tools/consumer
+go get svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/tools/publisher
+
+Install kafka from source, in a goroot/src (see go documentation):
+mkdir svn.apache.org/repos/asf/incubator/kafka.svn
+cd svn.apache.org/repos/asf/incubator/kafka.svn
+svn co http://svn.apache.org/repos/asf/incubator/kafka/trunk trunk
+
+
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html
Index: clients/go/Makefile
===================================================================
--- clients/go/Makefile (revision 1299800)
+++ clients/go/Makefile (working copy)
@@ -19,7 +19,7 @@
make -C tools/offsets clean all
format:
- gofmt -w -tabwidth=2 -tabindent=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go
+ gofmt -w -tabwidth=2 -tabs=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go
full: format clean install tools