Index: tools/consumer/consumer.go =================================================================== --- tools/consumer/consumer.go (revision 1185939) +++ tools/consumer/consumer.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package main import ( @@ -30,7 +29,6 @@ "os" "strconv" "os/signal" - "syscall" ) var hostname string @@ -87,7 +84,8 @@ go func() { for { sig := <-signal.Incoming - if sig.(signal.UnixSignal) == syscall.SIGINT { + //if sig.(signal.UnixSignal) == syscall.SIGINT { + if sig.String() == "SIGINT: interrupt" { quit <- true } } Index: src/kafka.go =================================================================== --- src/kafka.go (revision 1185939) +++ src/kafka.go (working copy) @@ -34,11 +34,10 @@ ) const ( - MAGIC_DEFAULT = 0 + MAGIC_DEFAULT = 1 NETWORK = "tcp" ) - type Broker struct { topic string partition int @@ -51,7 +50,6 @@ hostname: hostname} } - func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { Index: src/message.go =================================================================== --- src/message.go (revision 1185939) +++ src/message.go (working copy) @@ -20,7 +20,6 @@ * of their respective owners. */ - package kafka import ( @@ -30,9 +29,9 @@ "log" ) - type Message struct { magic byte + compression byte checksum [4]byte payload []byte offset uint64 // only used after decoding @@ -43,6 +42,11 @@ return m.offset } +// offset at end of message +func (m *Message) OffsetEnd() uint64 { + return m.offset + uint64(4+m.totalLength) +} + func (m *Message) Payload() []byte { return m.payload } @@ -54,19 +58,21 @@ func NewMessage(payload []byte) *Message { message := &Message{} message.magic = byte(MAGIC_DEFAULT) + message.compression = byte(0) binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(payload)) message.payload = payload return message } -// MESSAGE SET: +// MESSAGE SET: func (m *Message) Encode() []byte { - msgLen := 1 + 4 + len(m.payload) + msgLen := 1 + 1 + 4 + len(m.payload) msg := make([]byte, 4+msgLen) binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) msg[4] = m.magic - copy(msg[5:], m.checksum[0:]) - copy(msg[9:], m.payload) + msg[5] = m.compression + copy(msg[6:], m.checksum[0:]) + copy(msg[10:], m.payload) return msg } @@ -79,9 +85,19 @@ msg := Message{} msg.totalLength = length msg.magic = packet[4] - copy(msg.checksum[:], packet[5:9]) - payloadLength := length - 1 - 4 - msg.payload = packet[9 : 9+payloadLength] + if msg.magic == 1 { + msg.compression = packet[5] + copy(msg.checksum[:], packet[6:10]) + payloadLength := length - 1 - 4 - 1 + msg.payload = packet[10 : 10+payloadLength] + } else if msg.magic == 0 { + msg.compression = byte(0) + copy(msg.checksum[:], packet[5:9]) + payloadLength := length - 1 - 4 + msg.payload = packet[9 : 9+payloadLength] + } else { + // panic? + } payloadChecksum := make([]byte, 4) binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(msg.payload)) @@ -95,6 +111,7 @@ func (msg *Message) Print() { log.Println("----- Begin Message ------") log.Printf("magic: %X\n", msg.magic) + log.Printf("compression: %X\n", msg.compression) log.Printf("checksum: %X\n", msg.checksum) if len(msg.payload) < 1048576 { // 1 MB log.Printf("payload: %X\n", msg.payload) @@ -102,6 +119,6 @@ } else { log.Printf("long payload, length: %d\n", len(msg.payload)) } - log.Printf("offset: %d\n", msg.offset) + log.Printf("offset start: %d end: %d\n", msg.offset, msg.OffsetEnd()) log.Println("----- End Message ------") }