Index: kafka_test.go =================================================================== --- kafka_test.go (revision 1304309) +++ kafka_test.go (working copy) @@ -1,277 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "testing" - //"fmt" - "bytes" - "compress/gzip" -) - -func TestMessageCreation(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - if msg.magic != 1 { - t.Errorf("magic incorrect") - t.Fail() - } - - // generated by kafka-rb: e8 f3 5a 06 - expected := []byte{0xe8, 0xf3, 0x5a, 0x06} - if !bytes.Equal(expected, msg.checksum[:]) { - t.Fail() - } -} - -func TestMagic0MessageEncoding(t *testing.T) { - // generated by kafka-rb: - // test the old message format - expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - length, msgsDecoded := Decode(expected, DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fail() - } - msgDecoded := msgsDecoded[0] - - payload := []byte("testing") - if !bytes.Equal(payload, msgDecoded.payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 0 { - t.Fatal("magic incorrect") - } -} - -func TestMessageEncoding(t *testing.T) { - - payload := []byte("testing") - msg := NewMessage(payload) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } - - // verify round trip - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestCompressedMessageEncodingCompare(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() - msgDefaultBytes := NewCompressedMessage(payload).Encode() - if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) - } -} - -func TestCompressedMessageEncoding(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, - 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, - 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, - 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} - - expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} - - expected := make([]byte, len(expectedHeader)+len(expectedPayload)) - n := copy(expected, expectedHeader) - copy(expected[n:], expectedPayload) - - if msg.compression != 1 { - t.Fatalf("expected compression: 1 but got: %b", msg.compression) - } - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 100) - n, _ = zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() - - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) - } - - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } - - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatalf("checksums do not match, expected: % X but was: % X", - chksum, msgDecoded.checksum[:]) - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestLongCompressedMessageRoundTrip(t *testing.T) { - payloadBuf := bytes.NewBuffer([]byte{}) - // make the test bigger than buffer allocated in the Decode - for i := 0; i < 15; i++ { - payloadBuf.Write([]byte("testing123 ")) - } - - uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 200) - n, _ := zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() - - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", - uncompressed, uncompressedMsgBytes) - } - - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { - t.Fatal("bytes not equal") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestMultipleCompressedMessages(t *testing.T) { - msgs := []*Message{NewMessage([]byte("testing")), - NewMessage([]byte("multiple")), - NewMessage([]byte("messages")), - } - msg := NewCompressedMessages(msgs...) - - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - if length == 0 || msgsDecoded == nil { - t.Fatal("msgsDecoded is nil") - } - - // make sure the decompressed messages match what was put in - for index, decodedMsg := range msgsDecoded { - if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { - t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", - msgs[index].payload, decodedMsg.payload) - } - } -} - -func TestRequestHeaderEncoding(t *testing.T) { - broker := newBroker("localhost:9092", "test", 0) - request := broker.EncodeRequestHeader(REQUEST_PRODUCE) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00} - - if !bytes.Equal(expected, request.Bytes()) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) - t.Errorf("expected: %X\n but got: %X", expected, request) - t.Fail() - } -} - -func TestPublishRequestEncoding(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodePublishRequest(msg) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, - /* magic comp ...... chksum .... .. payload .. */ - 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } -} - -func TestConsumeRequestEncoding(t *testing.T) { - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) - - // generated by kafka-rb, encode_request_size + encode_request - expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, - 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} - - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } -} Index: src/kafka.go =================================================================== --- src/kafka.go (revision 1304309) +++ src/kafka.go (working copy) @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "log" - "net" - "os" - "fmt" - "encoding/binary" - "io" - "bufio" -) - -const ( - NETWORK = "tcp" -) - -type Broker struct { - topic string - partition int - hostname string -} - -func newBroker(hostname string, topic string, partition int) *Broker { - return &Broker{topic: topic, - partition: partition, - hostname: hostname} -} - -func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { - raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) - if err != nil { - log.Println("Fatal Error: ", err) - return nil, err - } - conn, err = net.DialTCP(NETWORK, nil, raddr) - if err != nil { - log.Println("Fatal Error: ", err) - return nil, err - } - return conn, error -} - -// returns length of response & payload & err -func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { - reader := bufio.NewReader(conn) - length := make([]byte, 4) - lenRead, err := io.ReadFull(reader, length) - if err != nil { - return 0, []byte{}, err - } - if lenRead != 4 || lenRead < 0 { - return 0, []byte{}, os.NewError("invalid length of the packet length field") - } - - expectedLength := binary.BigEndian.Uint32(length) - messages := make([]byte, expectedLength) - lenRead, err = io.ReadFull(reader, messages) - if err != nil { - return 0, []byte{}, err - } - - if uint32(lenRead) != expectedLength { - return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) - } - - errorCode := binary.BigEndian.Uint16(messages[0:2]) - if errorCode != 0 { - log.Println("errorCode: ", errorCode) - return 0, []byte{}, os.NewError( - fmt.Sprintf("Broker Response Error: %d", errorCode)) - } - return expectedLength, messages[2:], nil -} Index: src/message.go =================================================================== --- src/message.go (revision 1304309) +++ src/message.go (working copy) @@ -1,182 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "hash/crc32" - "encoding/binary" - "bytes" - "log" -) - -const ( - // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression - MAGIC_DEFAULT = 1 - // magic + compression + chksum - NO_LEN_HEADER_SIZE = 1 + 1 + 4 -) - -type Message struct { - magic byte - compression byte - checksum [4]byte - payload []byte - offset uint64 // only used after decoding - totalLength uint32 // total length of the raw message (from decoding) - -} - -func (m *Message) Offset() uint64 { - return m.offset -} - -func (m *Message) Payload() []byte { - return m.payload -} - -func (m *Message) PayloadString() string { - return string(m.payload) -} - -func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message { - message := &Message{} - message.magic = byte(MAGIC_DEFAULT) - message.compression = codec.Id() - message.payload = codec.Encode(payload) - binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload)) - return message -} - -// Default is is create a message with no compression -func NewMessage(payload []byte) *Message { - return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) -} - -// Create a Message using the default compression method (gzip) -func NewCompressedMessage(payload []byte) *Message { - return NewCompressedMessages(NewMessage(payload)) -} - -func NewCompressedMessages(messages ...*Message) *Message { - buf := bytes.NewBuffer([]byte{}) - for _, message := range messages { - buf.Write(message.Encode()) - } - return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID]) -} - -// MESSAGE SET: -func (m *Message) Encode() []byte { - msgLen := NO_LEN_HEADER_SIZE + len(m.payload) - msg := make([]byte, 4+msgLen) - binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) - msg[4] = m.magic - msg[5] = m.compression - - copy(msg[6:], m.checksum[0:]) - copy(msg[10:], m.payload) - - return msg -} - -func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { - return Decode(packet, DefaultCodecsMap) -} - -func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { - messages := []Message{} - - length, message := decodeMessage(packet, payloadCodecsMap) - - if length > 0 && message != nil { - if message.compression != NO_COMPRESSION_ID { - // wonky special case for compressed messages having embedded messages - payloadLen := uint32(len(message.payload)) - messageLenLeft := payloadLen - for messageLenLeft > 0 { - start := payloadLen - messageLenLeft - innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) - messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 - messages = append(messages, *innerMsg) - } - } else { - messages = append(messages, *message) - } - } - - return length, messages -} - -func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { - length := binary.BigEndian.Uint32(packet[0:]) - if length > uint32(len(packet[4:])) { - log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) - return 0, nil - } - msg := Message{} - msg.totalLength = length - msg.magic = packet[4] - - rawPayload := []byte{} - if msg.magic == 0 { - msg.compression = byte(0) - copy(msg.checksum[:], packet[5:9]) - payloadLength := length - 1 - 4 - rawPayload = packet[9 : 9+payloadLength] - } else if msg.magic == MAGIC_DEFAULT { - msg.compression = packet[5] - copy(msg.checksum[:], packet[6:10]) - payloadLength := length - NO_LEN_HEADER_SIZE - rawPayload = packet[10 : 10+payloadLength] - } else { - log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) - return 0, nil - } - - payloadChecksum := make([]byte, 4) - binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload)) - if !bytes.Equal(payloadChecksum, msg.checksum[:]) { - msg.Print() - log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) - return 0, nil - } - msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) - - return length, &msg -} - -func (msg *Message) Print() { - log.Println("----- Begin Message ------") - log.Printf("magic: %X\n", msg.magic) - log.Printf("compression: %X\n", msg.compression) - log.Printf("checksum: %X\n", msg.checksum) - if len(msg.payload) < 1048576 { // 1 MB - log.Printf("payload: % X\n", msg.payload) - log.Printf("payload(string): %s\n", msg.PayloadString()) - } else { - log.Printf("long payload, length: %d\n", len(msg.payload)) - } - log.Printf("length: %d\n", msg.totalLength) - log.Printf("offset: %d\n", msg.offset) - log.Println("----- End Message ------") -} Index: src/payload_codec.go =================================================================== --- src/payload_codec.go (revision 1304309) +++ src/payload_codec.go (working copy) @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "bytes" - "compress/gzip" - // "log" -) - -const ( - NO_COMPRESSION_ID = 0 - GZIP_COMPRESSION_ID = 1 -) - -type PayloadCodec interface { - - // the 1 byte id of the codec - Id() byte - - // encoder interface for compression implementation - Encode(data []byte) []byte - - // decoder interface for decompression implementation - Decode(data []byte) []byte -} - -// Default Codecs - -var DefaultCodecs = []PayloadCodec{ - new(NoCompressionPayloadCodec), - new(GzipPayloadCodec), -} - -var DefaultCodecsMap = codecsMap(DefaultCodecs) - -func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { - payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) - for _, c := range payloadCodecs { - payloadCodecsMap[c.Id()] = c, true - } - return payloadCodecsMap -} - -// No compression codec, noop - -type NoCompressionPayloadCodec struct { - -} - -func (codec *NoCompressionPayloadCodec) Id() byte { - return NO_COMPRESSION_ID -} - -func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte { - return data -} - -func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte { - return data -} - -// Gzip Codec - -type GzipPayloadCodec struct { - -} - -func (codec *GzipPayloadCodec) Id() byte { - return GZIP_COMPRESSION_ID -} - -func (codec *GzipPayloadCodec) Encode(data []byte) []byte { - buf := bytes.NewBuffer([]byte{}) - zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed) - zipper.Write(data) - zipper.Close() - return buf.Bytes() -} - -func (codec *GzipPayloadCodec) Decode(data []byte) []byte { - buf := bytes.NewBuffer([]byte{}) - zipper, _ := gzip.NewReader(bytes.NewBuffer(data)) - unzipped := make([]byte, 100) - for { - n, err := zipper.Read(unzipped) - if n > 0 && err == nil { - buf.Write(unzipped[0:n]) - } else { - break - } - } - - zipper.Close() - return buf.Bytes() -} Index: src/timing.go =================================================================== --- src/timing.go (revision 1304309) +++ src/timing.go (working copy) @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "log" - "time" -) - -type Timing struct { - label string - start int64 - stop int64 -} - -func StartTiming(label string) *Timing { - return &Timing{label: label, start: time.Nanoseconds(), stop: 0} -} - -func (t *Timing) Stop() { - t.stop = time.Nanoseconds() -} - -func (t *Timing) Print() { - if t.stop == 0 { - t.Stop() - } - log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000) -} Index: src/consumer/consumer.go =================================================================== --- src/consumer/consumer.go (revision 0) +++ src/consumer/consumer.go (working copy) @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "strconv" + //kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" + "kafka" + "syscall" +) + +var hostname string +var topic string +var partition int +var offset uint64 +var maxSize uint +var writePayloadsTo string +var consumerForever bool +var printmessage bool + +func init() { + flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") + flag.StringVar(&topic, "topic", "test", "topic to publish to") + flag.IntVar(&partition, "partition", 0, "partition to publish to") + flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") + flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request") + flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") + flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") + flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") +} + +func main() { + flag.Parse() + fmt.Println("Consuming Messages :") + fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Println(" ---------------------- ") + broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) + + var payloadFile *os.File = nil + if len(writePayloadsTo) > 0 { + var err error + payloadFile, err = os.Create(writePayloadsTo) + if err != nil { + fmt.Println("Error opening file: ", err) + payloadFile = nil + } + } + + consumerCallback := func(msg *kafka.Message) { + if printmessage { + msg.Print() + } + if payloadFile != nil { + payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) + payloadFile.Write(msg.Payload()) + payloadFile.Write([]byte("\n-------------------------------\n")) + } + } + + if consumerForever { + quit := make(chan bool, 1) + go func() { + sigIn := make(chan os.Signal) + signal.Notify(sigIn) + for { + + select { + case sig := <-sigIn: + if sig.(os.Signal) == syscall.SIGINT { + quit <- true + } else { + fmt.Println(sig) + } + } + } + }() + + msgChan := make(chan *kafka.Message) + go broker.ConsumeOnChannel(msgChan, 10, quit) + for msg := range msgChan { + if msg != nil { + consumerCallback(msg) + } else { + break + } + } + } else { + broker.Consume(consumerCallback) + } + + if payloadFile != nil { + payloadFile.Close() + } + +} Index: src/consumer/Makefile =================================================================== --- src/consumer/Makefile (revision 1304309) +++ src/consumer/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=consumer -GOFILES=\ - consumer.go\ - -include $(GOROOT)/src/Make.cmd Index: src/request.go =================================================================== --- src/request.go (revision 1304309) +++ src/request.go (working copy) @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "encoding/binary" - "bytes" -) - -type RequestType uint16 - -// Request Types -const ( - REQUEST_PRODUCE RequestType = 0 - REQUEST_FETCH = 1 - REQUEST_MULTIFETCH = 2 - REQUEST_MULTIPRODUCE = 3 - REQUEST_OFFSETS = 4 -) - -// Request Header: -func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer { - request := bytes.NewBuffer([]byte{}) - request.Write(uint32bytes(0)) // placeholder for request size - request.Write(uint16bytes(int(requestType))) - request.Write(uint16bytes(len(b.topic))) - request.WriteString(b.topic) - request.Write(uint32bytes(b.partition)) - - return request -} - -// after writing to the buffer is complete, encode the size of the request in the request. -func encodeRequestSize(request *bytes.Buffer) { - binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4)) -} - -// -func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte { - request := b.EncodeRequestHeader(REQUEST_OFFSETS) - // specific to offset request - request.Write(uint64ToUint64bytes(uint64(time))) - request.Write(uint32toUint32bytes(maxNumOffsets)) - - encodeRequestSize(request) - - return request.Bytes() -} - -// -func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { - request := b.EncodeRequestHeader(REQUEST_FETCH) - // specific to consume request - request.Write(uint64ToUint64bytes(offset)) - request.Write(uint32toUint32bytes(maxSize)) - - encodeRequestSize(request) - - return request.Bytes() -} - -// -func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { - // 4 + 2 + 2 + topicLength + 4 + 4 - request := b.EncodeRequestHeader(REQUEST_PRODUCE) - - messageSetSizePos := request.Len() - request.Write(uint32bytes(0)) // placeholder message len - - written := 0 - for _, message := range messages { - wrote, _ := request.Write(message.Encode()) - written += wrote - } - - // now add the accumulated size of that the message set was - binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) - // now add the size of the whole to the first uint32 - encodeRequestSize(request) - return request.Bytes() -} Index: src/publisher/publisher.go =================================================================== --- src/publisher/publisher.go (revision 0) +++ src/publisher/publisher.go (working copy) @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package main + +import ( + "flag" + "fmt" + //kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" + "kafka" + "os" +) + +var hostname string +var topic string +var partition int +var message string +var messageFile string +var compress bool + +func init() { + flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") + flag.StringVar(&topic, "topic", "test", "topic to publish to") + flag.IntVar(&partition, "partition", 0, "partition to publish to") + flag.StringVar(&message, "message", "", "message to publish") + flag.StringVar(&messageFile, "messagefile", "", "read message from this file") + flag.BoolVar(&compress, "compress", false, "compress the messages published") +} + +func main() { + flag.Parse() + fmt.Println("Publishing :", message) + fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Println(" ---------------------- ") + broker := kafka.NewBrokerPublisher(hostname, topic, partition) + + if len(message) == 0 && len(messageFile) != 0 { + file, err := os.Open(messageFile) + if err != nil { + fmt.Println("Error: ", err) + return + } + stat, err := file.Stat() + if err != nil { + fmt.Println("Error: ", err) + return + } + payload := make([]byte, stat.Size()) + file.Read(payload) + timing := kafka.StartTiming("Sending") + + if compress { + broker.Publish(kafka.NewCompressedMessage(payload)) + } else { + broker.Publish(kafka.NewMessage(payload)) + } + + timing.Print() + file.Close() + } else { + timing := kafka.StartTiming("Sending") + + if compress { + broker.Publish(kafka.NewCompressedMessage([]byte(message))) + } else { + broker.Publish(kafka.NewMessage([]byte(message))) + } + + timing.Print() + } +} Index: src/publisher/Makefile =================================================================== --- src/publisher/Makefile (revision 1304309) +++ src/publisher/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=publisher -GOFILES=\ - publisher.go\ - -include $(GOROOT)/src/Make.cmd Index: src/consumer.go =================================================================== --- src/consumer.go (revision 1304309) +++ src/consumer.go (working copy) @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "log" - "os" - "net" - "time" - "encoding/binary" -) - -type BrokerConsumer struct { - broker *Broker - offset uint64 - maxSize uint32 - codecs map[byte]PayloadCodec -} - -// Create a new broker consumer -// hostname - host and optionally port, delimited by ':' -// topic to consume -// partition to consume from -// offset to start consuming from -// maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published) -func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer { - return &BrokerConsumer{broker: newBroker(hostname, topic, partition), - offset: offset, - maxSize: maxSize, - codecs: DefaultCodecsMap} -} - -// Simplified consumer that defaults the offset and maxSize to 0. -// hostname - host and optionally port, delimited by ':' -// topic to consume -// partition to consume from -func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer { - return &BrokerConsumer{broker: newBroker(hostname, topic, partition), - offset: 0, - maxSize: 0, - codecs: DefaultCodecsMap} -} - -// Add Custom Payload Codecs for Consumer Decoding -// payloadCodecs - an array of PayloadCodec implementations -func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { - // merge to the default map, so one 'could' override the default codecs.. - for k, v := range codecsMap(payloadCodecs) { - consumer.codecs[k] = v, true - } -} - -func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) { - conn, err := consumer.broker.connect() - if err != nil { - return -1, err - } - - num := 0 - done := make(chan bool, 1) - go func() { - for { - _, err := consumer.consumeWithConn(conn, func(msg *Message) { - msgChan <- msg - num += 1 - }) - - if err != nil { - if err != os.EOF { - log.Println("Fatal Error: ", err) - panic(err) - } - quit <- true // force quit - break - } - time.Sleep(pollTimeoutMs * 1000000) - } - done <- true - }() - // wait to be told to stop.. - <-quit - conn.Close() - close(msgChan) - <-done - return num, err -} - -type MessageHandlerFunc func(msg *Message) - -func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) { - conn, err := consumer.broker.connect() - if err != nil { - return -1, err - } - defer conn.Close() - - num, err := consumer.consumeWithConn(conn, handlerFunc) - - if err != nil { - log.Println("Fatal Error: ", err) - } - - return num, err -} - -func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) { - _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) - if err != nil { - return -1, err - } - - length, payload, err := consumer.broker.readResponse(conn) - - if err != nil { - return -1, err - } - - num := 0 - if length > 2 { - // parse out the messages - var currentOffset uint64 = 0 - for currentOffset <= uint64(length-4) { - totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) - if msgs == nil { - return num, os.NewError("Error Decoding Message") - } - msgOffset := consumer.offset + currentOffset - for _, msg := range msgs { - // update all of the messages offset - // multiple messages can be at the same offset (compressed for example) - msg.offset = msgOffset - handlerFunc(&msg) - num += 1 - } - currentOffset += uint64(4 + totalLength) - } - // update the broker's offset for next consumption - consumer.offset += currentOffset - } - - return num, err -} - -// Get a list of valid offsets (up to maxNumOffsets) before the given time, where -// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) -// The result is a list of offsets, in descending order. -func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) { - offsets := make([]uint64, 0) - - conn, err := consumer.broker.connect() - if err != nil { - return offsets, err - } - - defer conn.Close() - - _, err = conn.Write(consumer.broker.EncodeOffsetRequest(time, maxNumOffsets)) - if err != nil { - return offsets, err - } - - length, payload, err := consumer.broker.readResponse(conn) - if err != nil { - return offsets, err - } - - if length > 4 { - // get the number of offsets - numOffsets := binary.BigEndian.Uint32(payload[0:]) - var currentOffset uint64 = 4 - for currentOffset < uint64(length-4) && uint32(len(offsets)) < numOffsets { - offset := binary.BigEndian.Uint64(payload[currentOffset:]) - offsets = append(offsets, offset) - currentOffset += 8 // offset size - } - } - - return offsets, err -} Index: src/publisher.go =================================================================== --- src/publisher.go (revision 1304309) +++ src/publisher.go (working copy) @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "os" -) - -type BrokerPublisher struct { - broker *Broker -} - -func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher { - return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} -} - -func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) { - return b.BatchPublish(message) -} - -func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) { - conn, err := b.broker.connect() - if err != nil { - return -1, err - } - defer conn.Close() - // TODO: MULTIPRODUCE - request := b.broker.EncodePublishRequest(messages...) - num, err := conn.Write(request) - if err != nil { - return -1, err - } - - return num, err -} Index: src/converts.go =================================================================== --- src/converts.go (revision 1304309) +++ src/converts.go (working copy) @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "encoding/binary" -) - -func uint16bytes(value int) []byte { - result := make([]byte, 2) - binary.BigEndian.PutUint16(result, uint16(value)) - return result -} - -func uint32bytes(value int) []byte { - result := make([]byte, 4) - binary.BigEndian.PutUint32(result, uint32(value)) - return result -} - -func uint32toUint32bytes(value uint32) []byte { - result := make([]byte, 4) - binary.BigEndian.PutUint32(result, value) - return result -} - -func uint64ToUint64bytes(value uint64) []byte { - result := make([]byte, 8) - binary.BigEndian.PutUint64(result, value) - return result -} Index: src/offsets/Makefile =================================================================== --- src/offsets/Makefile (revision 1304309) +++ src/offsets/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=offsets -GOFILES=\ - offsets.go\ - -include $(GOROOT)/src/Make.cmd Index: src/offsets/offsets.go =================================================================== --- src/offsets/offsets.go (revision 0) +++ src/offsets/offsets.go (working copy) @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package main + +import ( + "flag" + "fmt" + //kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" + "kafka" +) + +var hostname string +var topic string +var partition int +var offsets uint +var time int64 + +func init() { + flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") + flag.StringVar(&topic, "topic", "test", "topic to read offsets from") + flag.IntVar(&partition, "partition", 0, "partition to read offsets from") + flag.UintVar(&offsets, "offsets", 1, "number of offsets returned") + flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)") +} + +func main() { + flag.Parse() + fmt.Println("Offsets :") + fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) + fmt.Println(" ---------------------- ") + broker := kafka.NewBrokerOffsetConsumer(hostname, topic, partition) + + offsets, err := broker.GetOffsets(time, uint32(offsets)) + if err != nil { + fmt.Println("Error: ", err) + } + fmt.Printf("Offsets found: %d\n", len(offsets)) + for i := 0; i < len(offsets); i++ { + fmt.Printf("Offset[%d] = %d\n", i, offsets[i]) + } +} Index: src/kafka/kafka_test.go =================================================================== --- src/kafka/kafka_test.go (revision 0) +++ src/kafka/kafka_test.go (working copy) @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "testing" + //"fmt" + "bytes" + "compress/gzip" +) + +func TestMessageCreation(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + if msg.magic != 1 { + t.Errorf("magic incorrect") + t.Fail() + } + + // generated by kafka-rb: e8 f3 5a 06 + expected := []byte{0xe8, 0xf3, 0x5a, 0x06} + if !bytes.Equal(expected, msg.checksum[:]) { + t.Fail() + } +} + +func TestMagic0MessageEncoding(t *testing.T) { + // generated by kafka-rb: + // test the old message format + expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + length, msgsDecoded := Decode(expected, DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fail() + } + msgDecoded := msgsDecoded[0] + + payload := []byte("testing") + if !bytes.Equal(payload, msgDecoded.payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 0 { + t.Fatal("magic incorrect") + } +} + +func TestMessageEncoding(t *testing.T) { + + payload := []byte("testing") + msg := NewMessage(payload) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } + + // verify round trip + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestCompressedMessageEncodingCompare(t *testing.T) { + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() + msgDefaultBytes := NewCompressedMessage(payload).Encode() + if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) + } +} + +func TestCompressedMessageEncoding(t *testing.T) { + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + + expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x09, 0x6E, 0x88, 0x04, + 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, + 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, + 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} + + expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x96, 0x71, 0xA6, 0xE8} + + expected := make([]byte, len(expectedHeader)+len(expectedPayload)) + n := copy(expected, expectedHeader) + copy(expected[n:], expectedPayload) + + if msg.compression != 1 { + t.Fatalf("expected compression: 1 but got: %b", msg.compression) + } + + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 100) + n, _ = zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() + + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) + } + + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } + + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatalf("checksums do not match, expected: % X but was: % X", + chksum, msgDecoded.checksum[:]) + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestLongCompressedMessageRoundTrip(t *testing.T) { + payloadBuf := bytes.NewBuffer([]byte{}) + // make the test bigger than buffer allocated in the Decode + for i := 0; i < 15; i++ { + payloadBuf.Write([]byte("testing123 ")) + } + + uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 200) + n, _ := zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() + + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", + uncompressed, uncompressedMsgBytes) + } + + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { + t.Fatal("bytes not equal") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestMultipleCompressedMessages(t *testing.T) { + msgs := []*Message{NewMessage([]byte("testing")), + NewMessage([]byte("multiple")), + NewMessage([]byte("messages")), + } + msg := NewCompressedMessages(msgs...) + + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + if length == 0 || msgsDecoded == nil { + t.Fatal("msgsDecoded is nil") + } + + // make sure the decompressed messages match what was put in + for index, decodedMsg := range msgsDecoded { + if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { + t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", + msgs[index].payload, decodedMsg.payload) + } + } +} + +func TestRequestHeaderEncoding(t *testing.T) { + broker := newBroker("localhost:9092", "test", 0) + request := broker.EncodeRequestHeader(REQUEST_PRODUCE) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00} + + if !bytes.Equal(expected, request.Bytes()) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) + t.Errorf("expected: %X\n but got: %X", expected, request) + t.Fail() + } +} + +func TestPublishRequestEncoding(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodePublishRequest(msg) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, + /* magic comp ...... chksum .... .. payload .. */ + 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: % X\n but got: % X", expected, request) + t.Fail() + } +} + +func TestConsumeRequestEncoding(t *testing.T) { + + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) + + // generated by kafka-rb, encode_request_size + encode_request + expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, + 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} + + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: % X\n but got: % X", expected, request) + t.Fail() + } +} Index: src/kafka/kafka.go =================================================================== --- src/kafka/kafka.go (revision 0) +++ src/kafka/kafka.go (working copy) @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "net" +) + +const ( + NETWORK = "tcp" +) + +type Broker struct { + topic string + partition int + hostname string +} + +func newBroker(hostname string, topic string, partition int) *Broker { + return &Broker{topic: topic, + partition: partition, + hostname: hostname} +} + +func (b *Broker) connect() (conn *net.TCPConn, error error) { + raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) + if err != nil { + log.Println("Fatal Error: ", err) + return nil, err + } + conn, err = net.DialTCP(NETWORK, nil, raddr) + if err != nil { + log.Println("Fatal Error: ", err) + return nil, err + } + return conn, error +} + +// returns length of response & payload & err +func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { + reader := bufio.NewReader(conn) + length := make([]byte, 4) + lenRead, err := io.ReadFull(reader, length) + if err != nil { + return 0, []byte{}, err + } + if lenRead != 4 || lenRead < 0 { + return 0, []byte{}, errors.New("invalid length of the packet length field") + } + + expectedLength := binary.BigEndian.Uint32(length) + messages := make([]byte, expectedLength) + lenRead, err = io.ReadFull(reader, messages) + if err != nil { + return 0, []byte{}, err + } + + if uint32(lenRead) != expectedLength { + return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) + } + + errorCode := binary.BigEndian.Uint16(messages[0:2]) + if errorCode != 0 { + log.Println("errorCode: ", errorCode) + return 0, []byte{}, errors.New( + fmt.Sprintf("Broker Response Error: %d", errorCode)) + } + return expectedLength, messages[2:], nil +} Index: src/kafka/message.go =================================================================== --- src/kafka/message.go (revision 0) +++ src/kafka/message.go (working copy) @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bytes" + "encoding/binary" + "hash/crc32" + "log" +) + +const ( + // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression + MAGIC_DEFAULT = 1 + // magic + compression + chksum + NO_LEN_HEADER_SIZE = 1 + 1 + 4 +) + +type Message struct { + magic byte + compression byte + checksum [4]byte + payload []byte + offset uint64 // only used after decoding + totalLength uint32 // total length of the raw message (from decoding) + +} + +func (m *Message) Offset() uint64 { + return m.offset +} + +func (m *Message) Payload() []byte { + return m.payload +} + +func (m *Message) PayloadString() string { + return string(m.payload) +} + +func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message { + message := &Message{} + message.magic = byte(MAGIC_DEFAULT) + message.compression = codec.Id() + message.payload = codec.Encode(payload) + binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload)) + return message +} + +// Default is is create a message with no compression +func NewMessage(payload []byte) *Message { + return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID]) +} + +// Create a Message using the default compression method (gzip) +func NewCompressedMessage(payload []byte) *Message { + return NewCompressedMessages(NewMessage(payload)) +} + +func NewCompressedMessages(messages ...*Message) *Message { + buf := bytes.NewBuffer([]byte{}) + for _, message := range messages { + buf.Write(message.Encode()) + } + return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID]) +} + +// MESSAGE SET: +func (m *Message) Encode() []byte { + msgLen := NO_LEN_HEADER_SIZE + len(m.payload) + msg := make([]byte, 4+msgLen) + binary.BigEndian.PutUint32(msg[0:], uint32(msgLen)) + msg[4] = m.magic + msg[5] = m.compression + + copy(msg[6:], m.checksum[0:]) + copy(msg[10:], m.payload) + + return msg +} + +func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) { + return Decode(packet, DefaultCodecsMap) +} + +func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) { + messages := []Message{} + + length, message := decodeMessage(packet, payloadCodecsMap) + + if length > 0 && message != nil { + if message.compression != NO_COMPRESSION_ID { + // wonky special case for compressed messages having embedded messages + payloadLen := uint32(len(message.payload)) + messageLenLeft := payloadLen + for messageLenLeft > 0 { + start := payloadLen - messageLenLeft + innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap) + messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32 + messages = append(messages, *innerMsg) + } + } else { + messages = append(messages, *message) + } + } + + return length, messages +} + +func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) { + length := binary.BigEndian.Uint32(packet[0:]) + if length > uint32(len(packet[4:])) { + log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:])) + return 0, nil + } + msg := Message{} + msg.totalLength = length + msg.magic = packet[4] + + rawPayload := []byte{} + if msg.magic == 0 { + msg.compression = byte(0) + copy(msg.checksum[:], packet[5:9]) + payloadLength := length - 1 - 4 + rawPayload = packet[9 : 9+payloadLength] + } else if msg.magic == MAGIC_DEFAULT { + msg.compression = packet[5] + copy(msg.checksum[:], packet[6:10]) + payloadLength := length - NO_LEN_HEADER_SIZE + rawPayload = packet[10 : 10+payloadLength] + } else { + log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic) + return 0, nil + } + + payloadChecksum := make([]byte, 4) + binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload)) + if !bytes.Equal(payloadChecksum, msg.checksum[:]) { + msg.Print() + log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:]) + return 0, nil + } + msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload) + + return length, &msg +} + +func (msg *Message) Print() { + log.Println("----- Begin Message ------") + log.Printf("magic: %X\n", msg.magic) + log.Printf("compression: %X\n", msg.compression) + log.Printf("checksum: %X\n", msg.checksum) + if len(msg.payload) < 1048576 { // 1 MB + log.Printf("payload: % X\n", msg.payload) + log.Printf("payload(string): %s\n", msg.PayloadString()) + } else { + log.Printf("long payload, length: %d\n", len(msg.payload)) + } + log.Printf("length: %d\n", msg.totalLength) + log.Printf("offset: %d\n", msg.offset) + log.Println("----- End Message ------") +} Index: src/kafka/payload_codec.go =================================================================== --- src/kafka/payload_codec.go (revision 0) +++ src/kafka/payload_codec.go (working copy) @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bytes" + "compress/gzip" + // "log" +) + +const ( + NO_COMPRESSION_ID = 0 + GZIP_COMPRESSION_ID = 1 +) + +type PayloadCodec interface { + + // the 1 byte id of the codec + Id() byte + + // encoder interface for compression implementation + Encode(data []byte) []byte + + // decoder interface for decompression implementation + Decode(data []byte) []byte +} + +// Default Codecs + +var DefaultCodecs = []PayloadCodec{ + new(NoCompressionPayloadCodec), + new(GzipPayloadCodec), +} + +var DefaultCodecsMap = codecsMap(DefaultCodecs) + +func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { + payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) + for _, c := range payloadCodecs { + payloadCodecsMap[c.Id()] = c + } + return payloadCodecsMap +} + +// No compression codec, noop + +type NoCompressionPayloadCodec struct { +} + +func (codec *NoCompressionPayloadCodec) Id() byte { + return NO_COMPRESSION_ID +} + +func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte { + return data +} + +func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte { + return data +} + +// Gzip Codec + +type GzipPayloadCodec struct { +} + +func (codec *GzipPayloadCodec) Id() byte { + return GZIP_COMPRESSION_ID +} + +func (codec *GzipPayloadCodec) Encode(data []byte) []byte { + buf := bytes.NewBuffer([]byte{}) + zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed) + zipper.Write(data) + zipper.Close() + return buf.Bytes() +} + +func (codec *GzipPayloadCodec) Decode(data []byte) []byte { + buf := bytes.NewBuffer([]byte{}) + zipper, _ := gzip.NewReader(bytes.NewBuffer(data)) + unzipped := make([]byte, 100) + for { + n, err := zipper.Read(unzipped) + if n > 0 && err == nil { + buf.Write(unzipped[0:n]) + } else { + break + } + } + + zipper.Close() + return buf.Bytes() +} Index: src/kafka/timing.go =================================================================== --- src/kafka/timing.go (revision 0) +++ src/kafka/timing.go (working copy) @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "log" + "time" +) + +type Timing struct { + label string + start int64 + stop int64 +} + +func StartTiming(label string) *Timing { + return &Timing{label: label, start: time.Now().UnixNano()} +} + +func (t *Timing) Stop() { + t.stop = time.Now().UnixNano() +} + +func (t *Timing) Print() { + if t.stop == 0 { + t.Stop() + } + log.Printf("%s took: %f ms\n", t.label, float64(t.stop-t.start)/1000000) +} Index: src/kafka/request.go =================================================================== --- src/kafka/request.go (revision 0) +++ src/kafka/request.go (working copy) @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bytes" + "encoding/binary" +) + +type RequestType uint16 + +// Request Types +const ( + REQUEST_PRODUCE RequestType = 0 + REQUEST_FETCH = 1 + REQUEST_MULTIFETCH = 2 + REQUEST_MULTIPRODUCE = 3 + REQUEST_OFFSETS = 4 +) + +// Request Header: +func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer { + request := bytes.NewBuffer([]byte{}) + request.Write(uint32bytes(0)) // placeholder for request size + request.Write(uint16bytes(int(requestType))) + request.Write(uint16bytes(len(b.topic))) + request.WriteString(b.topic) + request.Write(uint32bytes(b.partition)) + + return request +} + +// after writing to the buffer is complete, encode the size of the request in the request. +func encodeRequestSize(request *bytes.Buffer) { + binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4)) +} + +// +func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte { + request := b.EncodeRequestHeader(REQUEST_OFFSETS) + // specific to offset request + request.Write(uint64ToUint64bytes(uint64(time))) + request.Write(uint32toUint32bytes(maxNumOffsets)) + + encodeRequestSize(request) + + return request.Bytes() +} + +// +func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte { + request := b.EncodeRequestHeader(REQUEST_FETCH) + // specific to consume request + request.Write(uint64ToUint64bytes(offset)) + request.Write(uint32toUint32bytes(maxSize)) + + encodeRequestSize(request) + + return request.Bytes() +} + +// +func (b *Broker) EncodePublishRequest(messages ...*Message) []byte { + // 4 + 2 + 2 + topicLength + 4 + 4 + request := b.EncodeRequestHeader(REQUEST_PRODUCE) + + messageSetSizePos := request.Len() + request.Write(uint32bytes(0)) // placeholder message len + + written := 0 + for _, message := range messages { + wrote, _ := request.Write(message.Encode()) + written += wrote + } + + // now add the accumulated size of that the message set was + binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written)) + // now add the size of the whole to the first uint32 + encodeRequestSize(request) + return request.Bytes() +} Index: src/kafka/consumer.go =================================================================== --- src/kafka/consumer.go (revision 0) +++ src/kafka/consumer.go (working copy) @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "encoding/binary" + "errors" + "io" + "log" + "net" + "time" +) + +type BrokerConsumer struct { + broker *Broker + offset uint64 + maxSize uint32 + codecs map[byte]PayloadCodec +} + +// Create a new broker consumer +// hostname - host and optionally port, delimited by ':' +// topic to consume +// partition to consume from +// offset to start consuming from +// maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published) +func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer { + return &BrokerConsumer{broker: newBroker(hostname, topic, partition), + offset: offset, + maxSize: maxSize, + codecs: DefaultCodecsMap} +} + +// Simplified consumer that defaults the offset and maxSize to 0. +// hostname - host and optionally port, delimited by ':' +// topic to consume +// partition to consume from +func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer { + return &BrokerConsumer{broker: newBroker(hostname, topic, partition), + offset: 0, + maxSize: 0, + codecs: DefaultCodecsMap} +} + +// Add Custom Payload Codecs for Consumer Decoding +// payloadCodecs - an array of PayloadCodec implementations +func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { + // merge to the default map, so one 'could' override the default codecs.. + for k, v := range codecsMap(payloadCodecs) { + consumer.codecs[k] = v + } +} + +func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) { + conn, err := consumer.broker.connect() + if err != nil { + return -1, err + } + + num := 0 + done := make(chan bool, 1) + go func() { + for { + _, err := consumer.consumeWithConn(conn, func(msg *Message) { + msgChan <- msg + num += 1 + }) + + if err != nil { + if err != io.EOF { + log.Println("Fatal Error: ", err) + panic(err) + } + quit <- true // force quit + break + } + time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs)) + } + done <- true + }() + // wait to be told to stop.. + <-quit + conn.Close() + close(msgChan) + <-done + return num, err +} + +type MessageHandlerFunc func(msg *Message) + +func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) { + conn, err := consumer.broker.connect() + if err != nil { + return -1, err + } + defer conn.Close() + + num, err := consumer.consumeWithConn(conn, handlerFunc) + + if err != nil { + log.Println("Fatal Error: ", err) + } + + return num, err +} + +func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { + _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) + if err != nil { + return -1, err + } + + length, payload, err := consumer.broker.readResponse(conn) + + if err != nil { + return -1, err + } + + num := 0 + if length > 2 { + // parse out the messages + var currentOffset uint64 = 0 + for currentOffset <= uint64(length-4) { + totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) + if msgs == nil { + return num, errors.New("Error Decoding Message") + } + msgOffset := consumer.offset + currentOffset + for _, msg := range msgs { + // update all of the messages offset + // multiple messages can be at the same offset (compressed for example) + msg.offset = msgOffset + handlerFunc(&msg) + num += 1 + } + currentOffset += uint64(4 + totalLength) + } + // update the broker's offset for next consumption + consumer.offset += currentOffset + } + + return num, err +} + +// Get a list of valid offsets (up to maxNumOffsets) before the given time, where +// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) +// The result is a list of offsets, in descending order. +func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) { + offsets := make([]uint64, 0) + + conn, err := consumer.broker.connect() + if err != nil { + return offsets, err + } + + defer conn.Close() + + _, err = conn.Write(consumer.broker.EncodeOffsetRequest(time, maxNumOffsets)) + if err != nil { + return offsets, err + } + + length, payload, err := consumer.broker.readResponse(conn) + if err != nil { + return offsets, err + } + + if length > 4 { + // get the number of offsets + numOffsets := binary.BigEndian.Uint32(payload[0:]) + var currentOffset uint64 = 4 + for currentOffset < uint64(length-4) && uint32(len(offsets)) < numOffsets { + offset := binary.BigEndian.Uint64(payload[currentOffset:]) + offsets = append(offsets, offset) + currentOffset += 8 // offset size + } + } + + return offsets, err +} Index: src/kafka/publisher.go =================================================================== --- src/kafka/publisher.go (revision 0) +++ src/kafka/publisher.go (working copy) @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +type BrokerPublisher struct { + broker *Broker +} + +func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher { + return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} +} + +func (b *BrokerPublisher) Publish(message *Message) (int, error) { + return b.BatchPublish(message) +} + +func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { + conn, err := b.broker.connect() + if err != nil { + return -1, err + } + defer conn.Close() + // TODO: MULTIPRODUCE + request := b.broker.EncodePublishRequest(messages...) + num, err := conn.Write(request) + if err != nil { + return -1, err + } + + return num, err +} Index: src/kafka/converts.go =================================================================== --- src/kafka/converts.go (revision 0) +++ src/kafka/converts.go (working copy) @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "encoding/binary" +) + +func uint16bytes(value int) []byte { + result := make([]byte, 2) + binary.BigEndian.PutUint16(result, uint16(value)) + return result +} + +func uint32bytes(value int) []byte { + result := make([]byte, 4) + binary.BigEndian.PutUint32(result, uint32(value)) + return result +} + +func uint32toUint32bytes(value uint32) []byte { + result := make([]byte, 4) + binary.BigEndian.PutUint32(result, value) + return result +} + +func uint64ToUint64bytes(value uint64) []byte { + result := make([]byte, 8) + binary.BigEndian.PutUint64(result, value) + return result +} Index: README.md =================================================================== --- README.md (revision 1304309) +++ README.md (working copy) @@ -6,16 +6,20 @@ ## Get up and running ## -Install go:
-For more info see: http://golang.org/doc/install.html#install +Install go (version 1):
+For more info see: http://weekly.golang.org/doc/install.html#install Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). +Also set your GOPATH appropriately: http://weekly.golang.org/doc/code.html#tmp_13 -Install kafka.go package:
-make install + +Build from source: +go install kafka
Make the tools (publisher & consumer)
-make tools +go install consumer +go install publisher +go install offsets
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html Index: tools/offsets/offsets.go =================================================================== --- tools/offsets/offsets.go (revision 1304309) +++ tools/offsets/offsets.go (working copy) @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - - -package main - -import ( - "kafka" - "flag" - "fmt" -) - -var hostname string -var topic string -var partition int -var offsets uint -var time int64 - -func init() { - flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") - flag.StringVar(&topic, "topic", "test", "topic to read offsets from") - flag.IntVar(&partition, "partition", 0, "partition to read offsets from") - flag.UintVar(&offsets, "offsets", 1, "number of offsets returned") - flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)") -} - - -func main() { - flag.Parse() - fmt.Println("Offsets :") - fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) - fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerOffsetConsumer(hostname, topic, partition) - - offsets, err := broker.GetOffsets(time, uint32(offsets)) - if err != nil { - fmt.Println("Error: ", err) - } - fmt.Printf("Offsets found: %d\n", len(offsets)) - for i := 0 ; i < len(offsets); i++ { - fmt.Printf("Offset[%d] = %d\n", i, offsets[i]) - } -} Index: tools/offsets/Makefile =================================================================== --- tools/offsets/Makefile (revision 1304309) +++ tools/offsets/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=offsets -GOFILES=\ - offsets.go\ - -include $(GOROOT)/src/Make.cmd Index: tools/consumer/consumer.go =================================================================== --- tools/consumer/consumer.go (revision 1304309) +++ tools/consumer/consumer.go (working copy) @@ -1,111 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package main - -import ( - "kafka" - "flag" - "fmt" - "os" - "strconv" - "os/signal" - "syscall" -) - -var hostname string -var topic string -var partition int -var offset uint64 -var maxSize uint -var writePayloadsTo string -var consumerForever bool -var printmessage bool - -func init() { - flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") - flag.StringVar(&topic, "topic", "test", "topic to publish to") - flag.IntVar(&partition, "partition", 0, "partition to publish to") - flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") - flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from") - flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") - flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") - flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") -} - -func main() { - flag.Parse() - fmt.Println("Consuming Messages :") - fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition) - fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize)) - - var payloadFile *os.File = nil - if len(writePayloadsTo) > 0 { - var err os.Error - payloadFile, err = os.Create(writePayloadsTo) - if err != nil { - fmt.Println("Error opening file: ", err) - payloadFile = nil - } - } - - consumerCallback := func(msg *kafka.Message) { - if printmessage { - msg.Print() - } - if payloadFile != nil { - payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n")) - payloadFile.Write(msg.Payload()) - payloadFile.Write([]byte("\n-------------------------------\n")) - } - } - - if consumerForever { - quit := make(chan bool, 1) - go func() { - for { - sig := <-signal.Incoming - if sig.(os.UnixSignal) == syscall.SIGINT { - quit <- true - } - } - }() - - msgChan := make(chan *kafka.Message) - go broker.ConsumeOnChannel(msgChan, 10, quit) - for msg := range msgChan { - if msg != nil { - consumerCallback(msg) - } else { - break - } - } - } else { - broker.Consume(consumerCallback) - } - - if payloadFile != nil { - payloadFile.Close() - } - -} Index: tools/consumer/Makefile =================================================================== --- tools/consumer/Makefile (revision 1304309) +++ tools/consumer/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=consumer -GOFILES=\ - consumer.go\ - -include $(GOROOT)/src/Make.cmd Index: tools/publisher/publisher.go =================================================================== --- tools/publisher/publisher.go (revision 1304309) +++ tools/publisher/publisher.go (working copy) @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package main - -import ( - "kafka" - "flag" - "fmt" - "os" -) - -var hostname string -var topic string -var partition int -var message string -var messageFile string -var compress bool - -func init() { - flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server") - flag.StringVar(&topic, "topic", "test", "topic to publish to") - flag.IntVar(&partition, "partition", 0, "partition to publish to") - flag.StringVar(&message, "message", "", "message to publish") - flag.StringVar(&messageFile, "messagefile", "", "read message from this file") - flag.BoolVar(&compress, "compress", false, "compress the messages published") -} - -func main() { - flag.Parse() - fmt.Println("Publishing :", message) - fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition) - fmt.Println(" ---------------------- ") - broker := kafka.NewBrokerPublisher(hostname, topic, partition) - - if len(message) == 0 && len(messageFile) != 0 { - file, err := os.Open(messageFile) - if err != nil { - fmt.Println("Error: ", err) - return - } - stat, err := file.Stat() - if err != nil { - fmt.Println("Error: ", err) - return - } - payload := make([]byte, stat.Size) - file.Read(payload) - timing := kafka.StartTiming("Sending") - - if compress { - broker.Publish(kafka.NewCompressedMessage(payload)) - } else { - broker.Publish(kafka.NewMessage(payload)) - } - - timing.Print() - file.Close() - } else { - timing := kafka.StartTiming("Sending") - - if compress { - broker.Publish(kafka.NewCompressedMessage([]byte(message))) - } else { - broker.Publish(kafka.NewMessage([]byte(message))) - } - - timing.Print() - } -} Index: tools/publisher/Makefile =================================================================== --- tools/publisher/Makefile (revision 1304309) +++ tools/publisher/Makefile (working copy) @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=publisher -GOFILES=\ - publisher.go\ - -include $(GOROOT)/src/Make.cmd Index: Makefile =================================================================== --- Makefile (revision 1304309) +++ Makefile (working copy) @@ -1,25 +1,15 @@ -include $(GOROOT)/src/Make.inc -TARG=kafka -GOFILES=\ - src/kafka.go\ - src/message.go\ - src/converts.go\ - src/consumer.go\ - src/payload_codec.go\ - src/publisher.go\ - src/timing.go\ - src/request.go\ +kafka: + go install kafka + go test kafka -include $(GOROOT)/src/Make.pkg - tools: force - make -C tools/consumer clean all - make -C tools/publisher clean all - make -C tools/offsets clean all + go install consumer + go install offsets + go install publisher format: - gofmt -w -tabwidth=2 -tabindent=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go + gofmt -w -tabwidth=2 -tabindent=false src full: format clean install tools