Index: tools/consumer/consumer.go =================================================================== --- tools/consumer/consumer.go (revision 1186235) +++ tools/consumer/consumer.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package main import ( @@ -30,7 +29,6 @@ "os" "strconv" "os/signal" - "syscall" ) var hostname string @@ -53,7 +51,6 @@ flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") } - func main() { flag.Parse() fmt.Println("Consuming Messages :") @@ -87,7 +84,8 @@ go func() { for { sig := <-signal.Incoming - if sig.(signal.UnixSignal) == syscall.SIGINT { + //if sig.(signal.UnixSignal) == syscall.SIGINT { + if sig.String() == "SIGINT: interrupt" { quit <- true } } Index: kafka_test.go =================================================================== --- kafka_test.go (revision 1186235) +++ kafka_test.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package kafka import ( @@ -33,7 +32,7 @@ func TestMessageCreation(t *testing.T) { payload := []byte("testing") msg := NewMessage(payload) - if msg.magic != 0 { + if msg.magic != 1 { t.Errorf("magic incorrect") t.Fail() } @@ -41,21 +40,23 @@ // generated by kafka-rb: e8 f3 5a 06 expected := []byte{0xe8, 0xf3, 0x5a, 0x06} if !bytes.Equal(expected, msg.checksum[:]) { + t.Errorf("checksum incorrect") t.Fail() } } - func TestMessageEncoding(t *testing.T) { payload := []byte("testing") msg := NewMessage(payload) // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + // [msg len 4 byte int ] mgc cc [checksum ] [ payload ] + expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} if !bytes.Equal(expected, msg.Encode()) { + t.Errorf("encoded message value not correct") t.Fail() } - + // verify round trip msgDecoded := Decode(msg.Encode()) if !bytes.Equal(msgDecoded.payload, payload) { @@ -68,9 +69,12 @@ if !bytes.Equal(msgDecoded.checksum[:], chksum) { t.Fail() } - if msgDecoded.magic != 0 { + if msgDecoded.magic != 1 { t.Fail() } + if msgDecoded.compression != 0 { + t.Fail() + } } func TestRequestHeaderEncoding(t *testing.T) { @@ -88,7 +92,6 @@ } } - func TestPublishRequestEncoding(t *testing.T) { payload := []byte("testing") msg := NewMessage(payload) @@ -99,10 +102,10 @@ request := pubBroker.broker.EncodePublishRequest(messages) // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x0c, - 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - + expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x11, + 0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} if !bytes.Equal(expected, request) { t.Errorf("expected length: %d but got: %d", len(expected), len(request)) t.Errorf("expected: %X\n but got: %X", expected, request) Index: src/kafka.go =================================================================== --- src/kafka.go (revision 1186235) +++ src/kafka.go (working copy) @@ -34,11 +34,10 @@ ) const ( - MAGIC_DEFAULT = 0 + MAGIC_DEFAULT = 1 NETWORK = "tcp" ) - type Broker struct { topic string partition int Index: src/message.go =================================================================== --- src/message.go (revision 1186235) +++ src/message.go (working copy) @@ -30,9 +29,9 @@ "log" ) - type Message struct { magic byte + compression byte checksum [4]byte payload []byte offset uint64 // only used after decoding @@ -43,6 +42,11 @@ return m.offset } +// offset at end of message +func (m *Message) OffsetEnd() uint64 { + return m.offset + uint64(4+m.totalLength) +} + func (m *Message) Payload() []byte { return m.payload } @@ -54,19 +58,21 @@ func NewMessage(payload []byte) *Message { message := &Message{} message.magic = byte(MAGIC_DEFAULT) + message.compression = byte(0) binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(payload)) message.payload = payload return message } -// MESSAGE SET: +// MESSAGE SET: func (m *Message) Encode() []byte { - msgLen := 1 + 4 + len(m.payload) + msgLen := 1 + 1 + 4 + len(m.payload) msg := make([]byte, 4+msgLen) binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) msg[4] = m.magic - copy(msg[5:], m.checksum[0:]) - copy(msg[9:], m.payload) + msg[5] = m.compression + copy(msg[6:], m.checksum[0:]) + copy(msg[10:], m.payload) return msg } @@ -79,9 +85,19 @@ msg := Message{} msg.totalLength = length msg.magic = packet[4] - copy(msg.checksum[:], packet[5:9]) - payloadLength := length - 1 - 4 - msg.payload = packet[9 : 9+payloadLength] + if msg.magic == 1 { + msg.compression = packet[5] + copy(msg.checksum[:], packet[6:10]) + payloadLength := length - 1 - 4 - 1 + msg.payload = packet[10 : 10+payloadLength] + } else if msg.magic == 0 { + msg.compression = byte(0) + copy(msg.checksum[:], packet[5:9]) + payloadLength := length - 1 - 4 + msg.payload = packet[9 : 9+payloadLength] + } else { + // panic? + } payloadChecksum := make([]byte, 4) binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(msg.payload)) @@ -95,6 +111,7 @@ func (msg *Message) Print() { log.Println("----- Begin Message ------") log.Printf("magic: %X\n", msg.magic) + log.Printf("compression: %X\n", msg.compression) log.Printf("checksum: %X\n", msg.checksum) if len(msg.payload) < 1048576 { // 1 MB log.Printf("payload: %X\n", msg.payload) @@ -102,6 +119,6 @@ } else { log.Printf("long payload, length: %d\n", len(msg.payload)) } - log.Printf("offset: %d\n", msg.offset) + log.Printf("offset start: %d end: %d\n", msg.offset, msg.OffsetEnd()) log.Println("----- End Message ------") } Index: README.md =================================================================== --- README.md (revision 1186235) +++ README.md (working copy) @@ -11,6 +11,9 @@
Make the tools (publisher & consumer)
make tools + +Or do both
+make full
Start zookeeper, Kafka server
For more info on Kafka, see: http://sna-projects.com/kafka/quickstart.php @@ -71,7 +74,13 @@ offsets, err := broker.GetOffsets(-1, 1) +### Testing ### +To run the tests: + + >gotest + + ### Contact ### jeffreydamick (at) gmail (dot) com