From d7a796c373535242bca00a552d750e90c6ab714d Mon Sep 17 00:00:00 2001 From: Alejandro Crosa Date: Wed, 26 Oct 2011 14:50:42 -0700 Subject: [PATCH] Several fixes for Ruby client --- clients/ruby/kafka-rb.gemspec | 2 +- clients/ruby/lib/kafka.rb | 3 +++ clients/ruby/lib/kafka/producer.rb | 7 ++++++- clients/ruby/spec/consumer_spec.rb | 28 ++++++++++++++++++++++++++-- clients/ruby/spec/producer_spec.rb | 13 +++++++++++++ 5 files changed, 49 insertions(+), 4 deletions(-) diff --git a/clients/ruby/kafka-rb.gemspec b/clients/ruby/kafka-rb.gemspec index 52ff6af..b4bf039 100644 --- a/clients/ruby/kafka-rb.gemspec +++ b/clients/ruby/kafka-rb.gemspec @@ -16,7 +16,7 @@ Gem::Specification.new do |s| s.name = %q{kafka-rb} - s.version = "0.0.5" + s.version = "0.0.6" s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= s.authors = ["Alejandro Crosa"] diff --git a/clients/ruby/lib/kafka.rb b/clients/ruby/lib/kafka.rb index 0e0080b..ccf49c3 100644 --- a/clients/ruby/lib/kafka.rb +++ b/clients/ruby/lib/kafka.rb @@ -14,6 +14,9 @@ # limitations under the License. require 'socket' require 'zlib' +if RUBY_VERSION[0,3] == "1.8" + require 'iconv' +end require File.join(File.dirname(__FILE__), "kafka", "io") require File.join(File.dirname(__FILE__), "kafka", "request_type") diff --git a/clients/ruby/lib/kafka/producer.rb b/clients/ruby/lib/kafka/producer.rb index 4a81861..e773d7a 100644 --- a/clients/ruby/lib/kafka/producer.rb +++ b/clients/ruby/lib/kafka/producer.rb @@ -30,7 +30,12 @@ module Kafka end def encode(message) - [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s + if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding + ic = Iconv.new('UTF-8//IGNORE', 'UTF-8') + [message.magic].pack("C") + [message.calculate_checksum].pack("N") + ic.iconv(message.payload.to_s) + else + [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s.force_encoding(Encoding::ASCII_8BIT) + end end def encode_request(topic, partition, messages) diff --git a/clients/ruby/spec/consumer_spec.rb b/clients/ruby/spec/consumer_spec.rb index 3b5b77b..8f411e5 100644 --- a/clients/ruby/spec/consumer_spec.rb +++ b/clients/ruby/spec/consumer_spec.rb @@ -78,7 +78,7 @@ describe Consumer do end it "should read the response data" do - bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" + bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" @mocked_socket.should_receive(:read).exactly(:twice).and_return(bytes) @consumer.read_data_response.should eql(bytes[2, bytes.length]) end @@ -92,7 +92,7 @@ describe Consumer do end it "should parse a message set from bytes" do - bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" + bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" message = @consumer.parse_message_set_from(bytes).first message.payload.should eql("ale") message.checksum.should eql(1120192889) @@ -100,6 +100,30 @@ describe Consumer do message.valid?.should eql(true) end + it "should skip an incomplete message at the end of the response" do + bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" + # incomplete message + bytes += [8].pack("N") + messages = @consumer.parse_message_set_from(bytes) + messages.size.should eql(1) + end + + it "should skip an incomplete message at the end of the response which has the same length as an empty message" do + bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" + # incomplete message because payload is missing + bytes += [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + messages = @consumer.parse_message_set_from(bytes) + messages.size.should eql(1) + end + + it "should read empty messages correctly" do + # empty message + bytes = [5].pack("N") + [0].pack("C") + [0].pack("N") + "" + messages = @consumer.parse_message_set_from(bytes) + messages.size.should eql(1) + messages.first.payload.should eql("") + end + it "should consume messages" do @consumer.should_receive(:send_consume_request).and_return(true) @consumer.should_receive(:read_data_response).and_return("") diff --git a/clients/ruby/spec/producer_spec.rb b/clients/ruby/spec/producer_spec.rb index 947c792..1cfb933 100644 --- a/clients/ruby/spec/producer_spec.rb +++ b/clients/ruby/spec/producer_spec.rb @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# encoding: utf-8 require File.dirname(__FILE__) + '/spec_helper' describe Producer do @@ -58,6 +59,18 @@ describe Producer do full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s @producer.encode(message).should eql(full_message) end + + it "should encode strings containing non-ASCII characters" do + message = Kafka::Message.new("ümlaut") + encoded = @producer.encode(message) + data = [encoded.size].pack("N") + encoded + if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding + ic = Iconv.new('UTF-8//IGNORE', 'UTF-8') + ic.iconv(Kafka::Message.parse_from(data).payload).should eql("ümlaut") + else + Kafka::Message.parse_from(data).payload.force_encoding(Encoding::ASCII_8BIT).should eql("ümlaut") + end + end end describe "Request Encoding" do -- 1.7.5.4