Index: clients/php/src/tests/bootstrap.php =================================================================== --- clients/php/src/tests/bootstrap.php (revision 1366057) +++ clients/php/src/tests/bootstrap.php (working copy) @@ -1,3 +1,5 @@ +assertEquals(5 + strlen($test), strlen($encoded)); + $this->assertEquals(6 + strlen($test), strlen($encoded)); } public function testByteArrayContainsString() { @@ -45,8 +51,9 @@ $messages = array( 'test 1', 'test 2 abcde', + 'test 3', ); - $encoded = Kafka_Encoder::encode_produce_request($topic, $partition, $messages); + $encoded = Kafka_Encoder::encode_produce_request($topic, $partition, $messages, Kafka_Encoder::COMPRESSION_NONE); $this->assertContains($topic, $encoded); $this->assertContains($partition, $encoded); foreach ($messages as $msg) { @@ -54,8 +61,59 @@ } $size = 4 + 2 + 2 + strlen($topic) + 4 + 4; foreach ($messages as $msg) { - $size += 9 + strlen($msg); + $size += 10 + strlen($msg); } $this->assertEquals($size, strlen($encoded)); } + + public function testCompressNone() { + $msg = 'test message'; + $this->assertEquals($msg, Kafka_Encoder::compress($msg, Kafka_Encoder::COMPRESSION_NONE)); + } + + public function testCompressGzip() { + $msg = 'test message'; + $this->assertEquals($msg, gzdecode(Kafka_Encoder::compress($msg, Kafka_Encoder::COMPRESSION_GZIP))); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testCompressSnappy() { + $msg = 'test message'; + Kafka_Encoder::compress($msg, Kafka_Encoder::COMPRESSION_SNAPPY); + $this->fail('The above call should fail until SNAPPY support is added'); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testCompressUnknown() { + $msg = 'test message'; + Kafka_Encoder::compress($msg, 15); + $this->fail('The above call should fail'); + } + + public function testDecompressNone() { + $msg = 'test message'; + $this->assertEquals($msg, Kafka_Encoder::decompress($msg, Kafka_Encoder::COMPRESSION_NONE)); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testDecompressSnappy() { + $msg = 'test message'; + Kafka_Encoder::decompress($msg, Kafka_Encoder::COMPRESSION_SNAPPY); + $this->fail('The above call should fail until SNAPPY support is added'); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testDecompressUnknown() { + $msg = 'test message'; + Kafka_Encoder::decompress($msg, 15); + $this->fail('The above call should fail'); + } } Index: clients/php/src/tests/Kafka/FetchRequestTest.php =================================================================== --- clients/php/src/tests/Kafka/FetchRequestTest.php (revision 1366057) +++ clients/php/src/tests/Kafka/FetchRequestTest.php (working copy) @@ -1,3 +1,5 @@ +topic = 'a test topic'; + $this->topic = 'testtopic'; $this->partition = 0; $this->offset = 0; $this->maxSize = 10000; Index: clients/php/src/tests/Kafka/ProducerTest.php =================================================================== --- clients/php/src/tests/Kafka/ProducerTest.php (revision 1366057) +++ clients/php/src/tests/Kafka/ProducerTest.php (working copy) @@ -1,3 +1,5 @@ +conn); return stream_get_contents($this->conn); } + + public function getHost() { + return $this->host; + } + + public function getPort() { + return $this->port; + } + + public function getCompression() { + return $this->compression; + } } /** @@ -49,7 +62,7 @@ private $producer; public function setUp() { - $this->producer = new Kafka_ProducerMock('localhost', 1234); + $this->producer = new Kafka_ProducerMock('localhost', 1234, Kafka_Encoder::COMPRESSION_NONE); } public function tearDown() { @@ -57,7 +70,6 @@ unset($this->producer); } - public function testProducer() { $messages = array( 'test 1', @@ -73,4 +85,22 @@ $this->assertContains($msg, $sent); } } + + /** + * @expectedException Kafka_Exception_EndOfFile + */ + public function testConnectFailure() { + $producer = new Kafka_Producer('invalid-host-name', 1234567890, Kafka_Encoder::COMPRESSION_NONE); + $producer->connect(); + $this->fail('The above call should throw an exception'); + } + + public function testSerialize() { + $producer = new Kafka_ProducerMock('host', 1234, Kafka_Encoder::COMPRESSION_SNAPPY); + $serialized = serialize($producer); + $unserialized = unserialize($serialized); + $this->assertEquals('host', $unserialized->getHost()); + $this->assertEquals(1234, $unserialized->getPort()); + $this->assertEquals(Kafka_Encoder::COMPRESSION_SNAPPY, $unserialized->getCompression()); + } } Index: clients/php/src/tests/Kafka/MessageTest.php =================================================================== --- clients/php/src/tests/Kafka/MessageTest.php (revision 1366057) +++ clients/php/src/tests/Kafka/MessageTest.php (working copy) @@ -1,3 +1,4 @@ + Index: clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php =================================================================== --- clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php (revision 1366057) +++ clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php (working copy) @@ -1,3 +1,4 @@ +stream = fopen('php://temp', 'rb'); //read-only mode Index: clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php =================================================================== --- clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php (revision 1366057) +++ clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php (working copy) @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - - */ -class Kafka_BoundedByteBuffer_ReceiveTest extends PHPUnit_Framework_TestCase -{ - private $stream = null; - private $size1 = 0; - private $msg1 = ''; - private $size2 = 0; - private $msg2 = ''; - - /** - * @var Kafka_BoundedByteBuffer_Receive - */ - private $obj = null; - - /** - * Append two message sets to a sample stream to verify that only the first one is read - */ - public function setUp() { - $this->stream = fopen('php://temp', 'w+b'); - $this->msg1 = 'test message'; - $this->msg2 = 'another message'; - $this->size1 = strlen($this->msg1); - $this->size2 = strlen($this->msg2); - fwrite($this->stream, pack('N', $this->size1)); - fwrite($this->stream, $this->msg1); - fwrite($this->stream, pack('N', $this->size2)); - fwrite($this->stream, $this->msg2); - rewind($this->stream); - $this->obj = new Kafka_BoundedByteBuffer_Receive; - } - - public function tearDown() { - fclose($this->stream); - unset($this->obj); - } - - public function testReadFrom() { - $this->assertEquals($this->size1 + 4, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->msg1, stream_get_contents($this->obj->buffer)); - //test that we don't go beyond the first message set - $this->assertEquals(0, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->size1 + 4, ftell($this->stream)); - } - - public function testReadCompletely() { - $this->assertEquals($this->size1 + 4, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->msg1, stream_get_contents($this->obj->buffer)); - //test that we don't go beyond the first message set - $this->assertEquals(0, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->size1 + 4, ftell($this->stream)); - } - - public function testReadFromOffset() { - fseek($this->stream, $this->size1 + 4); - $this->obj = new Kafka_BoundedByteBuffer_Receive; - $this->assertEquals($this->size2 + 4, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->msg2, stream_get_contents($this->obj->buffer)); - //test that we reached the end of the stream (2nd message set) - $this->assertEquals(0, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->size1 + 4 + $this->size2 + 4, ftell($this->stream)); - } - - public function testReadCompletelyOffset() { - fseek($this->stream, $this->size1 + 4); - $this->obj = new Kafka_BoundedByteBuffer_Receive; - $this->assertEquals($this->size2 + 4, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->msg2, stream_get_contents($this->obj->buffer)); - //test that we reached the end of the stream (2nd message set) - $this->assertEquals(0, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->size1 + 4 + $this->size2 + 4, ftell($this->stream)); - } - - /** - * @expectedException RuntimeException - */ - public function testInvalidStream() { - $this->stream = fopen('php://temp', 'w+b'); - $this->obj->readFrom($this->stream); - $this->fail('The above call should throw an exception'); - } - - /** - * @expectedException RuntimeException - */ - public function testInvalidSizeTooBig() { - $maxSize = 10; - $this->obj = new Kafka_BoundedByteBuffer_Receive($maxSize); - $this->stream = fopen('php://temp', 'w+b'); - fwrite($this->stream, pack('N', $maxSize + 1)); - fwrite($this->stream, $this->msg1); - rewind($this->stream); - $this->obj->readFrom($this->stream); - $this->fail('The above call should throw an exception'); - } - - /** - * @expectedException RuntimeException - */ - public function testInvalidSizeNotPositive() { - $this->stream = fopen('php://temp', 'w+b'); - fwrite($this->stream, pack('N', 0)); - fwrite($this->stream, ''); - rewind($this->stream); - $this->obj->readFrom($this->stream); - $this->fail('The above call should throw an exception'); - } -} Index: clients/php/src/lib/Kafka/Encoder.php =================================================================== --- clients/php/src/lib/Kafka/Encoder.php (revision 1366057) +++ clients/php/src/lib/Kafka/Encoder.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -25,44 +25,110 @@ /** * 1 byte "magic" identifier to allow format changes * - * @var integer + * @const integer */ const CURRENT_MAGIC_VALUE = 1; + + const COMPRESSION_NONE = 0; + const COMPRESSION_GZIP = 1; + const COMPRESSION_SNAPPY = 2; /** * Encode a message. The format of an N byte message is the following: - * - 1 byte: "magic" identifier to allow format changes - * - 1 byte: "compression-attributes" for compression alogrithm - * - 4 bytes: CRC32 of the payload - * - (N - 5) bytes: payload + * - 1 byte: "magic" identifier to allow format changes + * - 1 byte: "compression-attributes" for compression alogrithm + * - 4 bytes: CRC32 of the payload + * - (N - 6) bytes: payload * * @param string $msg Message to encode * * @return string + * @throws Kafka_Exception */ - static public function encode_message($msg, $compression) { + static public function encode_message($msg, $compression = self::COMPRESSION_NONE) { + $compressed = self::compress($msg, $compression); // - return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($msg)) - . $msg; + return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($compressed)) + . $compressed; } /** + * Compress a message + * + * @param string $msg Message + * @param integer $compression 0=none, 1=gzip, 2=snappy + * + * @return string + * @throws Kafka_Exception + */ + static public function compress($msg, $compression) { + switch ($compression) { + case self::COMPRESSION_NONE: + return $msg; + case self::COMPRESSION_GZIP: + return gzencode($msg); + case self::COMPRESSION_SNAPPY: + throw new Kafka_Exception_NotSupported('SNAPPY compression not yet implemented'); + default: + throw new Kafka_Exception_NotSupported('Unknown compression flag: ' . $compression); + } + } + + /** + * Decompress a message + * + * @param string $msg Message + * @param integer $compression 0=none, 1=gzip, 2=snappy + * + * @return string + * @throws Kafka_Exception + */ + static public function decompress($msg, $compression) { + switch ($compression) { + case self::COMPRESSION_NONE: + return $msg; + case self::COMPRESSION_GZIP: + // NB: this is really a MessageSet, not just a single message + // although I'm not sure this is the best way to handle the inner offsets, + // as the symmetry with the outer collection iteration is broken. + // @see https://issues.apache.org/jira/browse/KAFKA-406 + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, gzinflate(substr($msg, 10))); + rewind($stream); + return new Kafka_MessageSetInternalIterator($stream, 0, 0); + case self::COMPRESSION_SNAPPY: + throw new Kafka_Exception_NotSupported('SNAPPY decompression not yet implemented'); + default: + throw new Kafka_Exception_NotSupported('Unknown compression flag: ' . $compression); + } + } + + /** * Encode a complete request * - * @param string $topic Topic - * @param integer $partition Partition number - * @param array $messages Array of messages to send - * @param compression $compression flag for type of compression + * @param string $topic Topic + * @param integer $partition Partition number + * @param array $messages Array of messages to send + * @param integer $compression flag for type of compression * * @return string + * @throws Kafka_Exception */ - static public function encode_produce_request($topic, $partition, array $messages, $compression) { - // encode messages as + static public function encode_produce_request($topic, $partition, array $messages, $compression = self::COMPRESSION_NONE) { + // not sure I agree this is the best design for compressed messages + // @see https://issues.apache.org/jira/browse/KAFKA-406 + $compress = ($compression !== self::COMPRESSION_NONE); $message_set = ''; foreach ($messages as $message) { - $encoded = self::encode_message($message, $compression); + $encoded = self::encode_message($message, self::COMPRESSION_NONE); + // encode messages as $message_set .= pack('N', strlen($encoded)) . $encoded; } + if ($compress) { + $encoded = self::encode_message($message_set, $compression); + $message_set = pack('N', strlen($encoded)) . $encoded; + } + // create the request as $data = pack('n', PRODUCE_REQUEST_ID) . pack('n', strlen($topic)) . $topic . Index: clients/php/src/lib/Kafka/MessageSet.php =================================================================== --- clients/php/src/lib/Kafka/MessageSet.php (revision 1366057) +++ clients/php/src/lib/Kafka/MessageSet.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -21,10 +21,20 @@ * @link http://sna-projects.com/kafka/ */ class Kafka_MessageSet implements Iterator -{ +{ /** + * @var resource + */ + protected $stream = null; + + /** * @var integer */ + protected $initialOffset = 0; + + /** + * @var integer + */ protected $validByteCount = 0; /** @@ -33,9 +43,14 @@ private $valid = false; /** - * @var array + * @var Kafka_Message */ - private $array = array(); + private $msg; + + /** + * @var Kafka_MessageSetInternalIterator + */ + private $internalIterator = null; /** * Constructor @@ -43,19 +58,67 @@ * @param resource $stream Stream resource * @param integer $errorCode Error code */ - public function __construct($stream, $errorCode = 0) { - $data = stream_get_contents($stream); - $len = strlen($data); - $ptr = 0; - while ($ptr <= ($len - 4)) { - $size = array_shift(unpack('N', substr($data, $ptr, 4))); - $ptr += 4; - $this->array[] = new Kafka_Message(substr($data, $ptr, $size)); - $ptr += $size; - $this->validByteCount += 4 + $size; + public function __construct($stream, $initialOffset = 0, $errorCode = 0) { + $this->validateErrorCode($errorCode); + $this->stream = $stream; + $this->initialOffset = $initialOffset; + } + + /** + * Validate the error code from the response + * + * @param integer $errorCode Error code + * + * @return void + * @throws Kafka_Exception + */ + protected function validateErrorCode($errorCode) { + switch ($errorCode) { + case 0: break; //success + case 1: throw new Kafka_Exception_OffsetOutOfRange('OffsetOutOfRange reading response errorCode'); + case 2: throw new Kafka_Exception_InvalidMessage('InvalidMessage reading response errorCode'); + case 3: throw new Kafka_Exception_WrongPartition('WrongPartition reading response errorCode'); + case 4: throw new Kafka_Exception_InvalidFetchSize('InvalidFetchSize reading response errorCode'); + default: throw new Kafka_Exception('Unknown error reading response errorCode'); } - fclose($stream); } + + /** + * Read the size of the next message (4 bytes) + * + * @return integer Size of the response buffer in bytes + * @throws Kafka_Exception when size is <=0 or >= $maxSize + */ + protected function getMessageSize() { + $size = fread($this->stream, 4); + if ((false === $size) || ('' === $size)) { + $errmsg = 'Received nothing when reading from channel, socket has likely been closed.'; + throw new Kafka_Exception_EndOfFile($errmsg); + } + $size = array_shift(unpack('N', $size)); + if ($size <= 0) { + throw new Kafka_Exception_OutOfRange($size . ' is not a valid message size'); + } + // TODO check if $size is too large + return $size; + } + + /** + * Read the next message + * + * @return string Message (raw) + * @throws Kafka_Exception when the message cannot be read from the stream buffer + */ + protected function getMessage() { + $size = $this->getMessageSize(); + + $msg = fread($this->stream, $size); + if (strlen($msg) != $size) { + throw new Kafka_Exception_OutOfRange('The message is bigger than the buffer'); + } + $this->validByteCount += 4 + $size; + return $msg; + } /** * Get message set size in bytes @@ -81,8 +144,15 @@ * @return void */ public function next() { - $this->valid = (FALSE !== next($this->array)); - } + if (null !== $this->internalIterator) { + $this->internalIterator->next(); + if ($this->internalIterator->valid()) { + return; + } + } + $this->internalIterator = null; + $this->preloadNextMessage(); + } /** * valid @@ -99,7 +169,7 @@ * @return integer */ public function key() { - return key($this->array); + return $this->validByteCount; } /** @@ -108,15 +178,41 @@ * @return Kafka_Message */ public function current() { - return current($this->array); + if (null !== $this->internalIterator && $this->internalIterator->valid()) { + return $this->internalIterator->current(); + } + return $this->msg; } /** - * rewind + * rewind - Cannot use fseek() * * @return void */ public function rewind() { - $this->valid = (FALSE !== reset($this->array)); + $this->internalIterator = null; + $this->validByteCount = 0; + $this->preloadNextMessage(); } + + /** + * Preload the next message + * + * @return void + */ + private function preloadNextMessage() { + try { + $this->msg = new Kafka_Message($this->getMessage()); + if ($this->msg->compression() != Kafka_Encoder::COMPRESSION_NONE) { + $this->internalIterator = $this->msg->payload(); + $this->internalIterator->rewind(); + $this->msg = null; + } else { + $this->internalIterator = null; + } + $this->valid = TRUE; + } catch (Kafka_Exception $e) { + $this->valid = FALSE; + } + } } Index: clients/php/src/lib/Kafka/FetchRequest.php =================================================================== --- clients/php/src/lib/Kafka/FetchRequest.php (revision 1366057) +++ clients/php/src/lib/Kafka/FetchRequest.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -22,6 +22,7 @@ */ class Kafka_FetchRequest extends Kafka_Request { + /** * @var string */ @@ -123,4 +124,3 @@ return 'topic:' . $this->topic . ', part:' . $this->partition . ' offset:' . $this->offset . ' maxSize:' . $this->maxSize; } } - Index: clients/php/src/lib/Kafka/Producer.php =================================================================== --- clients/php/src/lib/Kafka/Producer.php (revision 1366057) +++ clients/php/src/lib/Kafka/Producer.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -43,7 +43,9 @@ protected $port; /** - * @var integer + * Compression: 0=none; 1=gzip; 2=snappy + * + * @var integer */ protected $compression; @@ -53,25 +55,25 @@ * @param integer $host Host * @param integer $port Port */ - public function __construct($host, $port) { + public function __construct($host, $port, $compression = Kafka_Encoder::COMPRESSION_GZIP) { $this->request_key = 0; - $this->host = $host; - $this->port = $port; - $this->compression = 0; + $this->host = $host; + $this->port = $port; + $this->compression = $compression; } /** * Connect to Kafka via a socket * * @return void - * @throws RuntimeException + * @throws Kafka_Exception */ public function connect() { if (!is_resource($this->conn)) { - $this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); + $this->conn = @stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); } if (!is_resource($this->conn)) { - throw new RuntimeException('Cannot connect to Kafka: ' . $errstr, $errno); + throw new Kafka_Exception_EndOfFile('Cannot connect to Kafka: ' . $errstr, $errno); } } @@ -108,7 +110,7 @@ */ public function __sleep() { $this->close(); - return array('request_key', 'host', 'port'); + return array('request_key', 'host', 'port', 'compression'); } /** Index: clients/php/src/lib/Kafka/Message.php =================================================================== --- clients/php/src/lib/Kafka/Message.php (revision 1366057) +++ clients/php/src/lib/Kafka/Message.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -14,9 +14,9 @@ /** * A message. The format of an N byte message is the following: * 1 byte "magic" identifier to allow format changes - * 1 byte compression-attribute + * 1 byte compression-attribute (missing if magic=0) * 4 byte CRC32 of the payload - * N - 5 byte payload + * N - 6 byte payload (N-5 if magic=0) * * @category Libraries * @package Kafka @@ -26,8 +26,13 @@ */ class Kafka_Message { - /** + * Wire format (0=without compression attribute, 1=with) + * @var integer + */ + private $magic = Kafka_Encoder::CURRENT_MAGIC_VALUE; + + /** * @var string */ private $payload = null; @@ -35,17 +40,17 @@ /** * @var integer */ - private $size = 0; + private $size = 0; /** * @var integer */ - private $compression = 0; + private $compression = Kafka_Encoder::COMPRESSION_NONE; /** * @var string */ - private $crc = false; + private $crc = false; /** * Constructor @@ -53,14 +58,30 @@ * @param string $data Message payload */ public function __construct($data) { - $this->payload = substr($data, 6); - $this->compression = substr($data,1,1); - $this->crc = crc32($this->payload); - $this->size = strlen($this->payload); + $this->magic = array_shift(unpack('C', substr($data, 0, 1))); + if ($this->magic == 0) { + $this->crc = array_shift(unpack('N', substr($data, 1, 4))); + $this->payload = substr($data, 5); + } else { + $this->compression = array_shift(unpack('C', substr($data, 1, 1))); + $this->crc = array_shift(unpack('N', substr($data, 2, 4))); + $this->payload = substr($data, 6); + } + $this->size = strlen($this->payload); } /** + * Return the compression flag + * + * @return integer + */ + public function compression() { + return $this->compression; + } + + + /** * Encode a message * * @return string @@ -84,7 +105,7 @@ * @return integer */ public function magic() { - return Kafka_Encoder::CURRENT_MAGIC_VALUE; + return $this->magic; } /** @@ -99,10 +120,10 @@ /** * Get the message payload * - * @return string + * @return string|Kafka_MessageSetInternalIterator */ public function payload() { - return $this->payload; + return Kafka_Encoder::decompress($this->payload, $this->compression); } /** @@ -120,7 +141,21 @@ * @return string */ public function __toString() { - return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', compression = ' . $this->compression . - ', crc = ' . $this->crc . ', payload = ' . $this->payload . ')'; + try { + $payload = $this->payload(); + } catch (Exception $e) { + $payload = 'ERROR decoding payload: ' . $e->getMessage(); + } + if (!is_string($payload)) { + $payload = 'COMPRESSED-CONTENT'; + } + return 'message(' + . 'magic = ' . $this->magic + . ', compression = ' . $this->compression + . ', size = ' . $this->size() + . ', crc = ' . $this->crc + . ', valid = ' . ($this->isValid() ? 'true' : 'false') + . ', payload = ' . $payload + . ')'; } } Index: clients/php/src/lib/Kafka/RequestKeys.php =================================================================== --- clients/php/src/lib/Kafka/RequestKeys.php (revision 1366057) +++ clients/php/src/lib/Kafka/RequestKeys.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ Index: clients/php/src/lib/Kafka/SimpleConsumer.php =================================================================== --- clients/php/src/lib/Kafka/SimpleConsumer.php (revision 1366057) +++ clients/php/src/lib/Kafka/SimpleConsumer.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -25,17 +25,17 @@ /** * @var string */ - protected $host = 'localhost'; + protected $host = 'localhost'; /** * @var integer */ - protected $port = 9092; + protected $port = 9092; /** * @var integer */ - protected $socketTimeout = 10; + protected $socketTimeout = 10; /** * @var integer @@ -69,9 +69,9 @@ */ public function connect() { if (!is_resource($this->conn)) { - $this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); + $this->conn = @stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); if (!$this->conn) { - throw new RuntimeException($errstr, $errno); + throw new Kafka_Exception_EndOfFile($errstr, $errno); } stream_set_timeout($this->conn, $this->socketTimeout); stream_set_read_buffer($this->conn, $this->socketBufferSize); @@ -97,17 +97,19 @@ * @param Kafka_FetchRequest $req Request * * @return Kafka_MessageSet $messages + * @throws Kafka_Exception */ public function fetch(Kafka_FetchRequest $req) { $this->connect(); $this->sendRequest($req); //echo "\nRequest sent: ".(string)$req."\n"; - $response = $this->getResponse(); - //var_dump($response); - $this->close(); - return new Kafka_MessageSet($response['response']->buffer, $response['errorCode']); + + $size = $this->getResponseSize(); + $initialOffset = 6; + + return new Kafka_MessageSet($this->conn, $initialOffset, $this->getResponseCode()); } - + /** * Send the request * @@ -120,23 +122,43 @@ $send->writeCompletely($this->conn); } + /** - * Get the response + * Read the request size (4 bytes) if not read yet * - * @return array + * @param resource $stream Stream resource + * + * @return integer Size of the response buffer in bytes + * @throws Kafka_Exception when size is <=0 or >= $maxSize */ - protected function getResponse() { - $response = new Kafka_BoundedByteBuffer_Receive(); - $response->readCompletely($this->conn); - - rewind($response->buffer); - // this has the side effect of setting the initial position of buffer correctly - $errorCode = array_shift(unpack('n', fread($response->buffer, 2))); - //rewind($response->buffer); - return array( - 'response' => $response, - 'errorCode' => $errorCode, - ); + protected function getResponseSize() { + $size = @fread($this->conn, 4); + if ((false === $size) || ('' === $size)) { + $errmsg = 'Received nothing when reading from channel, socket has likely been closed.'; + throw new Kafka_Exception_EndOfFile($errmsg); + } + $size = array_shift(unpack('N', $size)); + if ($size <= 0 || $size > $this->socketBufferSize) { + throw new Kafka_Exception_OutOfRange($size . ' is not a valid message size'); + } + return $size; } - + + /** + * Read the response error code + * + * @return integer Error code + */ + protected function getResponseCode() { + return array_shift(unpack('n', fread($this->conn, 2))); + } + + /** + * Close the socket connection if still open + * + * @return vpopmail_del_domain(domain) + */ + public function __destruct() { + $this->close(); + } } Index: clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php =================================================================== --- clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php (revision 1366057) +++ clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -62,12 +62,12 @@ * @param resource $stream Stream resource * * @return integer Number of bytes read - * @throws RuntimeException when size is <=0 or >= $maxSize + * @throws Kafka_Exception when size is <=0 or >= $maxSize */ private function writeRequestSize($stream) { if (!$this->sizeWritten) { if (!fwrite($stream, pack('N', $this->size))) { - throw new RuntimeException('Cannot write request to stream (' . error_get_last() . ')'); + throw new Kafka_Exception_EndOfFile('Cannot write request to stream (' . error_get_last() . ')'); } $this->sizeWritten = true; return 4; @@ -81,7 +81,7 @@ * @param resource $stream Stream resource * * @return integer number of written bytes - * @throws RuntimeException + * @throws Kafka_Exception */ public function writeTo($stream) { // have we written the request size yet? Index: clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php =================================================================== --- clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php (revision 1366057) +++ clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php (working copy) @@ -1,154 +0,0 @@ - - * @copyright 2011 Lorenzo Alberton - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 - * @version $Revision: $ - * @link http://sna-projects.com/kafka/ - */ - -/** - * Read an entire message set from a stream into an internal buffer - * - * @category Libraries - * @package Kafka - * @author Lorenzo Alberton - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 - * @link http://sna-projects.com/kafka/ - */ -class Kafka_BoundedByteBuffer_Receive -{ - /** - * @var integer - */ - protected $size; - - /** - * @var boolean - */ - protected $sizeRead = false; - - /** - * @var integer - */ - protected $remainingBytes = 0; - - /** - * @var string resource - */ - public $buffer = null; - - /** - * @var boolean - */ - protected $complete = false; - - /** - * - * @var integer - */ - protected $maxSize = PHP_INT_MAX; - - /** - * Constructor - * - * @param integer $maxSize Max buffer size - */ - public function __construct($maxSize = PHP_INT_MAX) { - $this->maxSize = $maxSize; - } - - /** - * Destructor - * - * @return void - */ - public function __destruct() { - if (is_resource($this->buffer)) { - fclose($this->buffer); - } - } - - /** - * Read the request size (4 bytes) if not read yet - * - * @param resource $stream Stream resource - * - * @return integer Number of bytes read - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - private function readRequestSize($stream) { - if (!$this->sizeRead) { - $this->size = fread($stream, 4); - if ((false === $this->size) || ('' === $this->size)) { - $errmsg = 'Received nothing when reading from channel, socket has likely been closed.'; - throw new RuntimeException($errmsg); - } - $this->size = array_shift(unpack('N', $this->size)); - if ($this->size <= 0 || $this->size > $this->maxSize) { - throw new RuntimeException($this->size . ' is not a valid message size'); - } - $this->remainingBytes = $this->size; - $this->sizeRead = true; - return 4; - } - return 0; - } - - /** - * Read a chunk of data from the stream - * - * @param resource $stream Stream resource - * - * @return integer number of read bytes - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - public function readFrom($stream) { - // have we read the request size yet? - $read = $this->readRequestSize($stream); - // have we allocated the request buffer yet? - if (!$this->buffer) { - $this->buffer = fopen('php://temp', 'w+b'); - } - // if we have a buffer, read some stuff into it - if ($this->buffer && !$this->complete) { - $freadBufferSize = min(8192, $this->remainingBytes); - if ($freadBufferSize > 0) { - //TODO: check that fread returns something - $bytesRead = fwrite($this->buffer, fread($stream, $freadBufferSize)); - $this->remainingBytes -= $bytesRead; - $read += $bytesRead; - } - // did we get everything? - if ($this->remainingBytes <= 0) { - rewind($this->buffer); - $this->complete = true; - } - } - return $read; - } - - /** - * Read all the available bytes in the stream - * - * @param resource $stream Stream resource - * - * @return integer number of read bytes - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - public function readCompletely($stream) { - $read = 0; - while (!$this->complete) { - $read += $this->readFrom($stream); - } - return $read; - } -} - - - - \ No newline at end of file Index: clients/php/src/lib/Kafka/Request.php =================================================================== --- clients/php/src/lib/Kafka/Request.php (revision 1366057) +++ clients/php/src/lib/Kafka/Request.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -26,5 +26,21 @@ * @var integer */ public $id; + + /** + * Write the request to the output stream + * + * @param resource $stream Output stream + * + * @return void + */ + abstract public function writeTo($stream); + + /** + * Get request size in bytes + * + * @return integer + */ + abstract public function sizeInBytes(); } Index: clients/php/src/examples/consume.php =================================================================== --- clients/php/src/examples/consume.php (revision 1366057) +++ clients/php/src/examples/consume.php (working copy) @@ -1,4 +1,5 @@ #!/usr/bin/php +fetch($fetchRequest); foreach ($messages as $msg) { - echo "\nconsumed[$offset]: " . $msg->payload(); + echo "\nconsumed[$offset][$partialOffset]: " . $msg->payload(); + $partialOffset = $messages->validBytes(); } //advance the offset after consuming each message $offset += $messages->validBytes(); Index: clients/php/src/examples/autoloader.php =================================================================== --- clients/php/src/examples/autoloader.php (revision 1366057) +++ clients/php/src/examples/autoloader.php (working copy) @@ -1,3 +1,5 @@ +