From 62035bc383c41299705a30850953b391d3629780 Mon Sep 17 00:00:00 2001 From: Guan Liao Date: Mon, 25 Aug 2014 12:49:46 -0400 Subject: [PATCH] KAFKA-1496 Switch map to foreach to handle Stream --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala | 2 +- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..33470ff 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -122,7 +122,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = { val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size) - events.map{e => + events.foreach{e => try { if(e.hasKey) serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 906600c..1db6ac3 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -230,7 +230,13 @@ class AsyncProducerTest extends JUnit3Suite { val serializedData = handler.serialize(produceData) val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload))) + + // Test that the serialize handles seq from a Stream + val streamedSerializedData = handler.serialize(Stream(produceData:_*)) + val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload))) + TestUtils.checkEquals(produceData.iterator, deserializedData.iterator) + TestUtils.checkEquals(produceData.iterator, deserializedStreamData.iterator) } @Test -- 1.8.5.2 (Apple Git-48)