From 3bbdc1d87ff49cd40477ac528f5c40afac3ea3ae Mon Sep 17 00:00:00 2001 From: Aaron Raddon Date: Tue, 13 Mar 2012 18:38:14 -0700 Subject: [PATCH] kafka296 --- clients/go/kafka_test.go | 277 ----------------------------- clients/go/src/consumer.go | 21 ++- clients/go/src/kafka.go | 20 +- clients/go/src/kafka_test.go | 289 +++++++++++++++++++++++++++++++ clients/go/src/message.go | 4 +- clients/go/src/payload_codec.go | 4 +- clients/go/src/publisher.go | 8 +- clients/go/src/request.go | 2 +- clients/go/src/timing.go | 6 +- clients/go/tools/consumer/Makefile | 7 - clients/go/tools/consumer/consumer.go | 23 ++- clients/go/tools/offsets/Makefile | 7 - clients/go/tools/offsets/offsets.go | 6 +- clients/go/tools/publisher/Makefile | 7 - clients/go/tools/publisher/publisher.go | 4 +- 15 files changed, 338 insertions(+), 347 deletions(-) delete mode 100644 clients/go/kafka_test.go create mode 100644 clients/go/src/kafka_test.go delete mode 100644 clients/go/tools/consumer/Makefile delete mode 100644 clients/go/tools/offsets/Makefile delete mode 100644 clients/go/tools/publisher/Makefile diff --git a/clients/go/kafka_test.go b/clients/go/kafka_test.go deleted file mode 100644 index 05944bc..0000000 --- a/clients/go/kafka_test.go +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Copyright (c) 2011 NeuStar, Inc. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * NeuStar, the Neustar logo and related names and logos are registered - * trademarks, service marks or tradenames of NeuStar, Inc. All other - * product names, company names, marks, logos and symbols may be trademarks - * of their respective owners. - */ - -package kafka - -import ( - "testing" - //"fmt" - "bytes" - "compress/gzip" -) - -func TestMessageCreation(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - if msg.magic != 1 { - t.Errorf("magic incorrect") - t.Fail() - } - - // generated by kafka-rb: e8 f3 5a 06 - expected := []byte{0xe8, 0xf3, 0x5a, 0x06} - if !bytes.Equal(expected, msg.checksum[:]) { - t.Fail() - } -} - -func TestMagic0MessageEncoding(t *testing.T) { - // generated by kafka-rb: - // test the old message format - expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - length, msgsDecoded := Decode(expected, DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fail() - } - msgDecoded := msgsDecoded[0] - - payload := []byte("testing") - if !bytes.Equal(payload, msgDecoded.payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 0 { - t.Fatal("magic incorrect") - } -} - -func TestMessageEncoding(t *testing.T) { - - payload := []byte("testing") - msg := NewMessage(payload) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } - - // verify round trip - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatal("checksums do not match") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestCompressedMessageEncodingCompare(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() - msgDefaultBytes := NewCompressedMessage(payload).Encode() - if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) - } -} - -func TestCompressedMessageEncoding(t *testing.T) { - payload := []byte("testing") - uncompressedMsgBytes := NewMessage(payload).Encode() - - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, - 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, - 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, - 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} - - expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} - - expected := make([]byte, len(expectedHeader)+len(expectedPayload)) - n := copy(expected, expectedHeader) - copy(expected[n:], expectedPayload) - - if msg.compression != 1 { - t.Fatalf("expected compression: 1 but got: %b", msg.compression) - } - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 100) - n, _ = zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() - - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) - } - - if !bytes.Equal(expected, msg.Encode()) { - t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) - } - - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payload) { - t.Fatal("bytes not equal") - } - chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} - if !bytes.Equal(chksum, msgDecoded.checksum[:]) { - t.Fatalf("checksums do not match, expected: % X but was: % X", - chksum, msgDecoded.checksum[:]) - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestLongCompressedMessageRoundTrip(t *testing.T) { - payloadBuf := bytes.NewBuffer([]byte{}) - // make the test bigger than buffer allocated in the Decode - for i := 0; i < 15; i++ { - payloadBuf.Write([]byte("testing123 ")) - } - - uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() - msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) - - zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) - uncompressed := make([]byte, 200) - n, _ := zipper.Read(uncompressed) - uncompressed = uncompressed[:n] - zipper.Close() - - if !bytes.Equal(uncompressed, uncompressedMsgBytes) { - t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", - uncompressed, uncompressedMsgBytes) - } - - // verify round trip - length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) - - if length == 0 || msgsDecoded == nil { - t.Fatal("message is nil") - } - msgDecoded := msgsDecoded[0] - - if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { - t.Fatal("bytes not equal") - } - if msgDecoded.magic != 1 { - t.Fatal("magic incorrect") - } -} - -func TestMultipleCompressedMessages(t *testing.T) { - msgs := []*Message{NewMessage([]byte("testing")), - NewMessage([]byte("multiple")), - NewMessage([]byte("messages")), - } - msg := NewCompressedMessages(msgs...) - - length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) - if length == 0 || msgsDecoded == nil { - t.Fatal("msgsDecoded is nil") - } - - // make sure the decompressed messages match what was put in - for index, decodedMsg := range msgsDecoded { - if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { - t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", - msgs[index].payload, decodedMsg.payload) - } - } -} - -func TestRequestHeaderEncoding(t *testing.T) { - broker := newBroker("localhost:9092", "test", 0) - request := broker.EncodeRequestHeader(REQUEST_PRODUCE) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00} - - if !bytes.Equal(expected, request.Bytes()) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) - t.Errorf("expected: %X\n but got: %X", expected, request) - t.Fail() - } -} - -func TestPublishRequestEncoding(t *testing.T) { - payload := []byte("testing") - msg := NewMessage(payload) - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodePublishRequest(msg) - - // generated by kafka-rb: - expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, - /* magic comp ...... chksum .... .. payload .. */ - 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} - - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } -} - -func TestConsumeRequestEncoding(t *testing.T) { - - pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) - request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) - - // generated by kafka-rb, encode_request_size + encode_request - expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, - 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} - - if !bytes.Equal(expected, request) { - t.Errorf("expected length: %d but got: %d", len(expected), len(request)) - t.Errorf("expected: % X\n but got: % X", expected, request) - t.Fail() - } -} diff --git a/clients/go/src/consumer.go b/clients/go/src/consumer.go index 57a4452..23f82ed 100644 --- a/clients/go/src/consumer.go +++ b/clients/go/src/consumer.go @@ -23,11 +23,12 @@ package kafka import ( + "encoding/binary" + "errors" + "io" "log" - "os" "net" "time" - "encoding/binary" ) type BrokerConsumer struct { @@ -66,11 +67,11 @@ func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *Brok func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) { // merge to the default map, so one 'could' override the default codecs.. for k, v := range codecsMap(payloadCodecs) { - consumer.codecs[k] = v, true + consumer.codecs[k] = v } } -func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) { +func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) { conn, err := consumer.broker.connect() if err != nil { return -1, err @@ -86,14 +87,14 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime }) if err != nil { - if err != os.EOF { + if err != io.EOF { log.Println("Fatal Error: ", err) panic(err) } quit <- true // force quit break } - time.Sleep(pollTimeoutMs * 1000000) + time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs)) } done <- true }() @@ -107,7 +108,7 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime type MessageHandlerFunc func(msg *Message) -func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) { +func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) { conn, err := consumer.broker.connect() if err != nil { return -1, err @@ -123,7 +124,7 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os return num, err } -func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) { +func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) { _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize)) if err != nil { return -1, err @@ -142,7 +143,7 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M for currentOffset <= uint64(length-4) { totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs) if msgs == nil { - return num, os.NewError("Error Decoding Message") + return num, errors.New("Error Decoding Message") } msgOffset := consumer.offset + currentOffset for _, msg := range msgs { @@ -164,7 +165,7 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M // Get a list of valid offsets (up to maxNumOffsets) before the given time, where // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) // The result is a list of offsets, in descending order. -func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) { +func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) { offsets := make([]uint64, 0) conn, err := consumer.broker.connect() diff --git a/clients/go/src/kafka.go b/clients/go/src/kafka.go index a87431d..96a1929 100644 --- a/clients/go/src/kafka.go +++ b/clients/go/src/kafka.go @@ -23,13 +23,13 @@ package kafka import ( - "log" - "net" - "os" - "fmt" + "bufio" "encoding/binary" + "errors" + "fmt" "io" - "bufio" + "log" + "net" ) const ( @@ -48,7 +48,7 @@ func newBroker(hostname string, topic string, partition int) *Broker { hostname: hostname} } -func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { +func (b *Broker) connect() (conn *net.TCPConn, error error) { raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname) if err != nil { log.Println("Fatal Error: ", err) @@ -63,7 +63,7 @@ func (b *Broker) connect() (conn *net.TCPConn, error os.Error) { } // returns length of response & payload & err -func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { +func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { reader := bufio.NewReader(conn) length := make([]byte, 4) lenRead, err := io.ReadFull(reader, length) @@ -71,7 +71,7 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { return 0, []byte{}, err } if lenRead != 4 || lenRead < 0 { - return 0, []byte{}, os.NewError("invalid length of the packet length field") + return 0, []byte{}, errors.New("invalid length of the packet length field") } expectedLength := binary.BigEndian.Uint32(length) @@ -82,13 +82,13 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) { } if uint32(lenRead) != expectedLength { - return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) + return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength)) } errorCode := binary.BigEndian.Uint16(messages[0:2]) if errorCode != 0 { log.Println("errorCode: ", errorCode) - return 0, []byte{}, os.NewError( + return 0, []byte{}, errors.New( fmt.Sprintf("Broker Response Error: %d", errorCode)) } return expectedLength, messages[2:], nil diff --git a/clients/go/src/kafka_test.go b/clients/go/src/kafka_test.go new file mode 100644 index 0000000..49c54fa --- /dev/null +++ b/clients/go/src/kafka_test.go @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + +package kafka + +import ( + "bytes" + "compress/gzip" + "log" + "testing" +) + +func init() { + log.SetFlags(log.Ltime|log.Lshortfile) +} + +func TestMessageCreation(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + if msg.magic != 1 { + t.Errorf("magic incorrect") + t.Fail() + } + + // generated by kafka-rb: e8 f3 5a 06 + expected := []byte{0xe8, 0xf3, 0x5a, 0x06} + if !bytes.Equal(expected, msg.checksum[:]) { + t.Fail() + } +} + +func TestMagic0MessageEncoding(t *testing.T) { + // generated by kafka-rb: + // test the old message format + expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + length, msgsDecoded := Decode(expected, DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fail() + } + msgDecoded := msgsDecoded[0] + + payload := []byte("testing") + if !bytes.Equal(payload, msgDecoded.payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 0 { + t.Fatal("magic incorrect") + } +} + +func TestMessageEncoding(t *testing.T) { + + payload := []byte("testing") + msg := NewMessage(payload) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } + + // verify round trip + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatal("checksums do not match") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestCompressedMessageEncodingCompare(t *testing.T) { + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode() + msgDefaultBytes := NewCompressedMessage(payload).Encode() + if !bytes.Equal(msgDefaultBytes, msgGzipBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes) + } +} + +func TestCompressedMessageEncoding(t *testing.T) { + payload := []byte("testing") + uncompressedMsgBytes := NewMessage(payload).Encode() + + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + + /* NOTE: I could not get these tests to pass from apache trunk, i redid the values, + the tests passed, and i sent the message from go producer -> scala consumer and it worked? + not sure where these values for expected came from + expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, + 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, + 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, + 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} */ + expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x09, 0x6E, 0x88, 0x04, + 0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A, + 0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00, + 0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00} + + //expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76} + expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x96, 0x71, 0xA6, 0xE8} + + expected := make([]byte, len(expectedHeader)+len(expectedPayload)) + n := copy(expected, expectedHeader) + copy(expected[n:], expectedPayload) + + if msg.compression != 1 { + t.Fatalf("expected compression: 1 but got: %b", msg.compression) + } + + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 100) + n, _ = zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() + + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes) + } + + if !bytes.Equal(expected, msg.Encode()) { + t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode()) + } + + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fatal("bytes not equal") + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(chksum, msgDecoded.checksum[:]) { + t.Fatalf("checksums do not match, expected: % X but was: % X", + chksum, msgDecoded.checksum[:]) + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestLongCompressedMessageRoundTrip(t *testing.T) { + payloadBuf := bytes.NewBuffer([]byte{}) + // make the test bigger than buffer allocated in the Decode + for i := 0; i < 15; i++ { + payloadBuf.Write([]byte("testing123 ")) + } + + uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode() + msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]) + + zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload)) + uncompressed := make([]byte, 200) + n, _ := zipper.Read(uncompressed) + uncompressed = uncompressed[:n] + zipper.Close() + + if !bytes.Equal(uncompressed, uncompressedMsgBytes) { + t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", + uncompressed, uncompressedMsgBytes) + } + + // verify round trip + length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap) + + if length == 0 || msgsDecoded == nil { + t.Fatal("message is nil") + } + msgDecoded := msgsDecoded[0] + + if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) { + t.Fatal("bytes not equal") + } + if msgDecoded.magic != 1 { + t.Fatal("magic incorrect") + } +} + +func TestMultipleCompressedMessages(t *testing.T) { + msgs := []*Message{NewMessage([]byte("testing")), + NewMessage([]byte("multiple")), + NewMessage([]byte("messages")), + } + msg := NewCompressedMessages(msgs...) + + length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode()) + if length == 0 || msgsDecoded == nil { + t.Fatal("msgsDecoded is nil") + } + + // make sure the decompressed messages match what was put in + for index, decodedMsg := range msgsDecoded { + if !bytes.Equal(msgs[index].payload, decodedMsg.payload) { + t.Fatalf("Payload doesn't match, expected: % X but was: % X\n", + msgs[index].payload, decodedMsg.payload) + } + } +} + +func TestRequestHeaderEncoding(t *testing.T) { + broker := newBroker("localhost:9092", "test", 0) + request := broker.EncodeRequestHeader(REQUEST_PRODUCE) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00} + + if !bytes.Equal(expected, request.Bytes()) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) + t.Errorf("expected: %X\n but got: %X", expected, request) + t.Fail() + } +} + +func TestPublishRequestEncoding(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodePublishRequest(msg) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d, + /* magic comp ...... chksum .... .. payload .. */ + 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: % X\n but got: % X", expected, request) + t.Fail() + } +} + +func TestConsumeRequestEncoding(t *testing.T) { + + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) + + // generated by kafka-rb, encode_request_size + encode_request + expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, + 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} + + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: % X\n but got: % X", expected, request) + t.Fail() + } +} diff --git a/clients/go/src/message.go b/clients/go/src/message.go index aa31048..b214b08 100644 --- a/clients/go/src/message.go +++ b/clients/go/src/message.go @@ -23,9 +23,9 @@ package kafka import ( - "hash/crc32" - "encoding/binary" "bytes" + "encoding/binary" + "hash/crc32" "log" ) diff --git a/clients/go/src/payload_codec.go b/clients/go/src/payload_codec.go index 7d6f8b5..6db6cc7 100644 --- a/clients/go/src/payload_codec.go +++ b/clients/go/src/payload_codec.go @@ -57,7 +57,7 @@ var DefaultCodecsMap = codecsMap(DefaultCodecs) func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs)) for _, c := range payloadCodecs { - payloadCodecsMap[c.Id()] = c, true + payloadCodecsMap[c.Id()] = c } return payloadCodecsMap } @@ -65,7 +65,6 @@ func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { // No compression codec, noop type NoCompressionPayloadCodec struct { - } func (codec *NoCompressionPayloadCodec) Id() byte { @@ -83,7 +82,6 @@ func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte { // Gzip Codec type GzipPayloadCodec struct { - } func (codec *GzipPayloadCodec) Id() byte { diff --git a/clients/go/src/publisher.go b/clients/go/src/publisher.go index 5ca3093..0766d1a 100644 --- a/clients/go/src/publisher.go +++ b/clients/go/src/publisher.go @@ -22,10 +22,6 @@ package kafka -import ( - "os" -) - type BrokerPublisher struct { broker *Broker } @@ -34,11 +30,11 @@ func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPub return &BrokerPublisher{broker: newBroker(hostname, topic, partition)} } -func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) { +func (b *BrokerPublisher) Publish(message *Message) (int, error) { return b.BatchPublish(message) } -func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) { +func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) { conn, err := b.broker.connect() if err != nil { return -1, err diff --git a/clients/go/src/request.go b/clients/go/src/request.go index d15db90..8fbe4d0 100644 --- a/clients/go/src/request.go +++ b/clients/go/src/request.go @@ -23,8 +23,8 @@ package kafka import ( - "encoding/binary" "bytes" + "encoding/binary" ) type RequestType uint16 diff --git a/clients/go/src/timing.go b/clients/go/src/timing.go index 56d0166..243b0ea 100644 --- a/clients/go/src/timing.go +++ b/clients/go/src/timing.go @@ -34,16 +34,16 @@ type Timing struct { } func StartTiming(label string) *Timing { - return &Timing{label: label, start: time.Nanoseconds(), stop: 0} + return &Timing{label: label, start: time.Now().UnixNano()} } func (t *Timing) Stop() { - t.stop = time.Nanoseconds() + t.stop = time.Now().UnixNano() } func (t *Timing) Print() { if t.stop == 0 { t.Stop() } - log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000) + log.Printf("%s took: %f ms\n", t.label, float64(t.stop-t.start)/1000000) } diff --git a/clients/go/tools/consumer/Makefile b/clients/go/tools/consumer/Makefile deleted file mode 100644 index bfdc07d..0000000 --- a/clients/go/tools/consumer/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=consumer -GOFILES=\ - consumer.go\ - -include $(GOROOT)/src/Make.cmd diff --git a/clients/go/tools/consumer/consumer.go b/clients/go/tools/consumer/consumer.go index 50f0ebc..316321f 100644 --- a/clients/go/tools/consumer/consumer.go +++ b/clients/go/tools/consumer/consumer.go @@ -23,12 +23,12 @@ package main import ( - "kafka" "flag" "fmt" "os" - "strconv" "os/signal" + "strconv" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "syscall" ) @@ -46,7 +46,7 @@ func init() { flag.StringVar(&topic, "topic", "test", "topic to publish to") flag.IntVar(&partition, "partition", 0, "partition to publish to") flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from") - flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from") + flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request") flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file") flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming") flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout") @@ -61,7 +61,7 @@ func main() { var payloadFile *os.File = nil if len(writePayloadsTo) > 0 { - var err os.Error + var err error payloadFile, err = os.Create(writePayloadsTo) if err != nil { fmt.Println("Error opening file: ", err) @@ -74,7 +74,7 @@ func main() { msg.Print() } if payloadFile != nil { - payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n")) + payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n")) payloadFile.Write(msg.Payload()) payloadFile.Write([]byte("\n-------------------------------\n")) } @@ -83,10 +83,17 @@ func main() { if consumerForever { quit := make(chan bool, 1) go func() { + sigIn := make(chan os.Signal) + signal.Notify(sigIn) for { - sig := <-signal.Incoming - if sig.(os.UnixSignal) == syscall.SIGINT { - quit <- true + + select { + case sig := <-sigIn: + if sig.(os.Signal) == syscall.SIGINT { + quit <- true + } else { + fmt.Println(sig) + } } } }() diff --git a/clients/go/tools/offsets/Makefile b/clients/go/tools/offsets/Makefile deleted file mode 100644 index 15ac969..0000000 --- a/clients/go/tools/offsets/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=offsets -GOFILES=\ - offsets.go\ - -include $(GOROOT)/src/Make.cmd diff --git a/clients/go/tools/offsets/offsets.go b/clients/go/tools/offsets/offsets.go index 81e60d5..a748d40 100644 --- a/clients/go/tools/offsets/offsets.go +++ b/clients/go/tools/offsets/offsets.go @@ -20,13 +20,12 @@ * of their respective owners. */ - package main import ( - "kafka" "flag" "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" ) var hostname string @@ -43,7 +42,6 @@ func init() { flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)") } - func main() { flag.Parse() fmt.Println("Offsets :") @@ -56,7 +54,7 @@ func main() { fmt.Println("Error: ", err) } fmt.Printf("Offsets found: %d\n", len(offsets)) - for i := 0 ; i < len(offsets); i++ { + for i := 0; i < len(offsets); i++ { fmt.Printf("Offset[%d] = %d\n", i, offsets[i]) } } diff --git a/clients/go/tools/publisher/Makefile b/clients/go/tools/publisher/Makefile deleted file mode 100644 index ff48fb9..0000000 --- a/clients/go/tools/publisher/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -include $(GOROOT)/src/Make.inc - -TARG=publisher -GOFILES=\ - publisher.go\ - -include $(GOROOT)/src/Make.cmd diff --git a/clients/go/tools/publisher/publisher.go b/clients/go/tools/publisher/publisher.go index 0a316bf..f98c9b2 100644 --- a/clients/go/tools/publisher/publisher.go +++ b/clients/go/tools/publisher/publisher.go @@ -23,9 +23,9 @@ package main import ( - "kafka" "flag" "fmt" + kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src" "os" ) @@ -63,7 +63,7 @@ func main() { fmt.Println("Error: ", err) return } - payload := make([]byte, stat.Size) + payload := make([]byte, stat.Size()) file.Read(payload) timing := kafka.StartTiming("Sending") -- 1.7.4.4