Index: Rakefile =================================================================== --- Rakefile (revision 1186235) +++ Rakefile (working copy) @@ -21,7 +21,7 @@ GEM = 'kafka-rb' GEM_NAME = 'Kafka Client' -GEM_VERSION = '0.0.5' +GEM_VERSION = '0.0.6' AUTHORS = ['Alejandro Crosa'] EMAIL = "alejandrocrosa@gmail.com" HOMEPAGE = "http://github.com/acrosa/kafka-rb" Index: lib/kafka/producer.rb =================================================================== --- lib/kafka/producer.rb (revision 1186235) +++ lib/kafka/producer.rb (working copy) @@ -29,13 +29,9 @@ self.connect(self.host, self.port) end - def encode(message) - [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s - end - def encode_request(topic, partition, messages) message_set = Array(messages).collect { |message| - encoded_message = self.encode(message) + encoded_message = message.encode [encoded_message.length].pack("N") + encoded_message }.join("") Index: lib/kafka/message.rb =================================================================== --- lib/kafka/message.rb (revision 1186235) +++ lib/kafka/message.rb (working copy) @@ -16,24 +16,36 @@ # A message. The format of an N byte message is the following: # 1 byte "magic" identifier to allow format changes + # 1 byte compression flag to identify compression codec # 4 byte CRC32 of the payload # N - 5 byte payload class Message - MAGIC_IDENTIFIER_DEFAULT = 0 + MAGIC_IDENTIFIER_DEFAULT = 1 + COMPRESSION_CODEC = 0 - attr_accessor :magic, :checksum, :payload + attr_accessor :magic, :checksum, :payload, :compression - def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) + def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, + compression = COMPRESSION_CODEC, checksum = nil) self.magic = magic self.payload = payload + self.compression = compression self.checksum = checksum || self.calculate_checksum end + def encode + [self.magic].pack("C") + [self.compression].pack("C") + [calculate_checksum].pack("N") + self.payload.to_s + end + def calculate_checksum Zlib.crc32(self.payload) end - + + def to_s + "#{self.magic} #{self.compression} #{self.checksum} #{self.payload}" + end + def valid? self.checksum == Zlib.crc32(self.payload) end @@ -41,9 +53,10 @@ def self.parse_from(binary) size = binary[0, 4].unpack("N").shift.to_i magic = binary[4, 1].unpack("C").shift - checksum = binary[5, 4].unpack("N").shift - payload = binary[9, size] # 5 = 1 + 4 is Magic + Checksum - return Kafka::Message.new(payload, magic, checksum) + compression = binary[5, 1].unpack("C").shift + checksum = binary[6, 4].unpack("N").shift + payload = binary[10, size] # 6 = 1 + 1 + 4 is Magic + Compression + Checksum + return Kafka::Message.new(payload, magic, compression, checksum) end end end Index: spec/producer_spec.rb =================================================================== --- spec/producer_spec.rb (revision 1186235) +++ spec/producer_spec.rb (working copy) @@ -46,19 +46,6 @@ @producer.port.should eql(9092) end - describe "Message Encoding" do - it "should encode a message" do - message = Kafka::Message.new("alejandro") - full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload - @producer.encode(message).should eql(full_message) - end - - it "should encode an empty message" do - message = Kafka::Message.new() - full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s - @producer.encode(message).should eql(full_message) - end - end describe "Request Encoding" do it "should binary encode an empty request" do @@ -78,21 +65,21 @@ messages_length = bytes[16, 4].unpack("N").shift messages = bytes[20, messages_length] - bytes.length.should eql(32) - data_size.should eql(28) + bytes.length.should eql(33) + data_size.should eql(29) request_id.should eql(0) topic_length.should eql(4) topic.should eql("test") partition.should eql(3) - messages_length.should eql(12) + messages_length.should eql(13) end end end it "should send messages" do - @producer.should_receive(:write).and_return(32) + @producer.should_receive(:write).and_return(33) message = Kafka::Message.new("ale") - @producer.send(message).should eql(32) + @producer.send(message).should eql(33) end describe "Message Batching" do Index: spec/message_spec.rb =================================================================== --- spec/message_spec.rb (revision 1186235) +++ spec/message_spec.rb (working copy) @@ -22,7 +22,7 @@ describe "Kafka Message" do it "should have a default magic number" do - Message::MAGIC_IDENTIFIER_DEFAULT.should eql(0) + Message::MAGIC_IDENTIFIER_DEFAULT.should eql(1) end it "should have a magic field, a checksum and a payload" do @@ -53,15 +53,16 @@ @message.valid?.should eql(true) @message.checksum = 0 @message.valid?.should eql(false) - @message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum + @message = Message.new("alejandro", 1, 0, 66666666) # 66666666 is a funny checksum @message.valid?.should eql(false) end it "should parse a message from bytes" do - bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" + bytes = [12].pack("N") + [1].pack("C") + [0].pack("C") + [1120192889].pack("N") + "ale" message = Kafka::Message.parse_from(bytes) message.valid?.should eql(true) - message.magic.should eql(0) + message.magic.should eql(1) + message.compression.should eql(0) message.checksum.should eql(1120192889) message.payload.should eql("ale") end Index: spec/consumer_spec.rb =================================================================== --- spec/consumer_spec.rb (revision 1186235) +++ spec/consumer_spec.rb (working copy) @@ -19,9 +19,9 @@ before(:each) do @mocked_socket = mock(TCPSocket) TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket - @consumer = Consumer.new + @consumer = Consumer.new({ :offset => 0 }) end - + describe "Kafka Consumer" do it "should have a CONSUME_REQUEST_TYPE" do @@ -41,15 +41,15 @@ end it "should set a topic and partition on initialize" do - @consumer = Consumer.new({ :host => "localhost", :port => 9092, :topic => "testing" }) + @consumer = Consumer.new({ :host => "localhost", :port => 9092, :topic => "testing", :offset => 0 }) @consumer.topic.should eql("testing") @consumer.partition.should eql(0) - @consumer = Consumer.new({ :topic => "testing", :partition => 3 }) + @consumer = Consumer.new({ :topic => "testing", :partition => 3, :offset => 0 }) @consumer.partition.should eql(3) end it "should set default host and port if none is specified" do - @consumer = Consumer.new + @consumer = Consumer.new({:offset => 0 }) @consumer.host.should eql("localhost") @consumer.port.should eql(9092) end @@ -83,6 +83,10 @@ @consumer.read_data_response.should eql(bytes[2, bytes.length]) end + #it "should send an offset find request to server" do + # @consumer = Consumer.new + #end + it "should send a consumer request" do @consumer.stub!(:encode_request_size).and_return(666) @consumer.stub!(:encode_request).and_return("someencodedrequest") @@ -92,11 +96,11 @@ end it "should parse a message set from bytes" do - bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" + bytes = [12].pack("N") + [1].pack("C") + [0].pack("C") + [1120192889].pack("N") + "ale" message = @consumer.parse_message_set_from(bytes).first message.payload.should eql("ale") message.checksum.should eql(1120192889) - message.magic.should eql(0) + message.magic.should eql(1) message.valid?.should eql(true) end @@ -117,7 +121,7 @@ end it "should loop (every N seconds, configurable on polling attribute), and execute a block with the consumed messages" do - @consumer = Consumer.new({ :polling => 1 }) + @consumer = Consumer.new({ :polling => 1, :offset => 0 }) @consumer.stub!(:consume).and_return([mock(Kafka::Message)]) messages = [] messages.should_receive(:<<).exactly(:twice).and_return([]) Index: README.md =================================================================== --- README.md (revision 1186235) +++ README.md (working copy) @@ -2,11 +2,14 @@ kafka-rb allows you to produce messages to the Kafka distributed publish/subscribe messaging service. ## Requirements -You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka +You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at http://sna-projects.com/kafka ## Installation sudo gem install kafka-rb +## Install locally from source +rake install + (the code works fine with JRuby, Ruby 1.8x and Ruby 1.9.x) ## Usage Index: examples/producer.rb =================================================================== --- examples/producer.rb (revision 0) +++ examples/producer.rb (revision 0) @@ -0,0 +1,6 @@ +require File.join(File.dirname(__FILE__), "../lib", "kafka") + +producer = Kafka::Producer.new +message1 = Kafka::Message.new("some random message content") +message2 = Kafka::Message.new("some more content") +producer.send([message1, message2]) \ No newline at end of file Index: examples/consumer.rb =================================================================== --- examples/consumer.rb (revision 0) +++ examples/consumer.rb (revision 0) @@ -0,0 +1,8 @@ +require File.join(File.dirname(__FILE__), "../lib", "kafka") + + +consumer = Kafka::Consumer.new +consumer.loop do |messages| + puts "Received" + puts messages +end \ No newline at end of file