Index: clients/php/src/tests/bootstrap.php =================================================================== --- clients/php/src/tests/bootstrap.php (revision 1385060) +++ clients/php/src/tests/bootstrap.php (working copy) @@ -1,3 +1,5 @@ + + */ +class Kafka_ResponseTest extends PHPUnit_Framework_TestCase +{ + /** + * @expectedException Kafka_Exception_OffsetOutOfRange + */ + public function testErrorCodeValidationOffsetOutOfRange() { + Kafka_Response::validateErrorCode(1); + $this->fail('the line above should throw an exception'); + } + + /** + * @expectedException Kafka_Exception_InvalidMessage + */ + public function testErrorCodeValidationInvalidMessage() { + Kafka_Response::validateErrorCode(2); + $this->fail('the line above should throw an exception'); + } + + /** + * @expectedException Kafka_Exception_WrongPartition + */ + public function testErrorCodeValidationWrongPartition() { + Kafka_Response::validateErrorCode(3); + $this->fail('the line above should throw an exception'); + } + + /** + * @expectedException Kafka_Exception_InvalidFetchSize + */ + public function testErrorCodeValidationInvalidFetchSize() { + Kafka_Response::validateErrorCode(4); + $this->fail('the line above should throw an exception'); + } + + /** + * @expectedException Kafka_Exception + */ + public function testErrorCodeValidationUnknown() { + Kafka_Response::validateErrorCode(20); + $this->fail('the line above should throw an exception'); + } +} Index: clients/php/src/tests/Kafka/EncoderTest.php =================================================================== --- clients/php/src/tests/Kafka/EncoderTest.php (revision 1385060) +++ clients/php/src/tests/Kafka/EncoderTest.php (working copy) @@ -1,3 +1,4 @@ +assertEquals(5 + strlen($test), strlen($encoded)); + $this->assertEquals(6 + strlen($test), strlen($encoded)); } public function testByteArrayContainsString() { @@ -45,8 +51,9 @@ $messages = array( 'test 1', 'test 2 abcde', + 'test 3', ); - $encoded = Kafka_Encoder::encode_produce_request($topic, $partition, $messages); + $encoded = Kafka_Encoder::encode_produce_request($topic, $partition, $messages, Kafka_Encoder::COMPRESSION_NONE); $this->assertContains($topic, $encoded); $this->assertContains($partition, $encoded); foreach ($messages as $msg) { @@ -54,8 +61,59 @@ } $size = 4 + 2 + 2 + strlen($topic) + 4 + 4; foreach ($messages as $msg) { - $size += 9 + strlen($msg); + $size += 10 + strlen($msg); } $this->assertEquals($size, strlen($encoded)); } + + public function testCompressNone() { + $msg = 'test message'; + $this->assertEquals($msg, Kafka_Encoder::compress($msg, Kafka_Encoder::COMPRESSION_NONE)); + } + + public function testCompressGzip() { + $msg = 'test message'; + $this->assertEquals($msg, gzdecode(Kafka_Encoder::compress($msg, Kafka_Encoder::COMPRESSION_GZIP))); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testCompressSnappy() { + $msg = 'test message'; + Kafka_Encoder::compress($msg, Kafka_Encoder::COMPRESSION_SNAPPY); + $this->fail('The above call should fail until SNAPPY support is added'); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testCompressUnknown() { + $msg = 'test message'; + Kafka_Encoder::compress($msg, 15); + $this->fail('The above call should fail'); + } + + public function testDecompressNone() { + $msg = 'test message'; + $this->assertEquals($msg, Kafka_Encoder::decompress($msg, Kafka_Encoder::COMPRESSION_NONE)); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testDecompressSnappy() { + $msg = 'test message'; + Kafka_Encoder::decompress($msg, Kafka_Encoder::COMPRESSION_SNAPPY); + $this->fail('The above call should fail until SNAPPY support is added'); + } + + /** + * @expectedException Kafka_Exception_NotSupported + */ + public function testDecompressUnknown() { + $msg = 'test message'; + Kafka_Encoder::decompress($msg, 15); + $this->fail('The above call should fail'); + } } Index: clients/php/src/tests/Kafka/MessageSetTest.php =================================================================== --- clients/php/src/tests/Kafka/MessageSetTest.php (revision 0) +++ clients/php/src/tests/Kafka/MessageSetTest.php (revision 0) @@ -0,0 +1,210 @@ + + */ +class Kafka_MessageSetTest extends PHPUnit_Framework_TestCase +{ + private function getMessageSetBuffer(array $messages) { + $message_set = ''; + foreach ($messages as $message) { + $encoded = Kafka_Encoder::encode_message($message, Kafka_Encoder::COMPRESSION_NONE); + // encode messages as + $message_set .= pack('N', strlen($encoded)) . $encoded; + } + return $message_set; + } + + private function writeDummyMessageSet($stream, array $messages) { + return fwrite($stream, $this->getMessageSetBuffer($messages)); + } + + private function writeDummyCompressedMessageSet($stream, array $messages, $compression) { + $encoded = Kafka_Encoder::encode_message($this->getMessageSetBuffer($messages), $compression); + return fwrite($stream, pack('N', strlen($encoded)) . $encoded); + } + + public function testIterator() { + $stream = fopen('php://temp', 'w+b'); + $messages = array('message #1', 'message #2', 'message #3'); + $this->writeDummyMessageSet($stream, $messages); + rewind($stream); + $socket = Kafka_Socket::createFromStream($stream); + $set = new Kafka_MessageSet($socket, 0, 0); + $idx = 0; + foreach ($set as $offset => $msg) { + $this->assertEquals($messages[$idx++], $msg->payload()); + } + $this->assertEquals(count($messages), $idx); + + // test new offset + $readBytes = $set->validBytes(); + $this->assertEquals(60, $readBytes); + $readBytes = $set->sizeInBytes(); + $this->assertEquals(60, $readBytes); + + // no more data + $set = new Kafka_MessageSet($socket, $readBytes, 0); + $cnt = 0; + foreach ($set as $offset => $msg) { + $cnt++; + } + $this->assertEquals(0, $cnt); + + fclose($stream); + } + + public function testIteratorInvalidLastMessage() { + $stream1 = fopen('php://temp', 'w+b'); + $messages = array('message #1', 'message #2', 'message #3'); + $size = $this->writeDummyMessageSet($stream1, $messages); + rewind($stream1); + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, fread($stream1, $size - 2)); // copy partial stream buffer + rewind($stream); + + $socket = Kafka_Socket::createFromStream($stream); + $set = new Kafka_MessageSet($socket, 0, 0); + $idx = 0; + foreach ($set as $offset => $msg) { + $this->assertEquals($messages[$idx++], $msg->payload()); + } + $this->assertEquals(count($messages) - 1, $idx); // the last message should NOT be returned + fclose($stream); + + // test new offset + $readBytes = $set->validBytes(); + $this->assertEquals(40, $readBytes); + } + + public function testOffset() { + $stream = fopen('php://temp', 'w+b'); + $messages = array('message #1', 'message #2', 'message #3'); + $this->writeDummyMessageSet($stream, $messages); + $offsetOfSecondMessage = 20; // manually calculated + fseek($stream, $offsetOfSecondMessage, SEEK_SET); + + $socket = Kafka_Socket::createFromStream($stream); + $set = new Kafka_MessageSet($socket, $offsetOfSecondMessage, 0); + + $cnt = 0; + $idx = 1; + foreach ($set as $offset => $msg) { + $cnt++; + $this->assertEquals($messages[$idx++], $msg->payload()); + } + $this->assertEquals(2, $cnt); + fclose($stream); + + // test new offset + $readBytes = $set->validBytes(); + $this->assertEquals(40, $readBytes); + } + + public function testOffset2() { + $stream = fopen('php://temp', 'w+b'); + $messages = array('message #1', 'message #2', 'message #3'); + $this->writeDummyMessageSet($stream, $messages); + $offsetOfThirdMessage = 40; // manually calculated + + fseek($stream, $offsetOfThirdMessage, SEEK_SET); + $socket = Kafka_Socket::createFromStream($stream); + $set = new Kafka_MessageSet($socket, $offsetOfThirdMessage, 0); + + $cnt = 0; + foreach ($set as $offset => $msg) { + $cnt++; + $this->assertEquals($messages[2], $msg->payload()); + } + $this->assertEquals(1, $cnt); + fclose($stream); + + // test new offset + $readBytes = $set->validBytes(); + $this->assertEquals(20, $readBytes); + } + + public function testCompressedMessages() { + $stream = fopen('php://temp', 'w+b'); + $messages = array('message #1', 'message #2', 'message #3'); + $this->writeDummyCompressedMessageSet($stream, $messages, Kafka_Encoder::COMPRESSION_GZIP); + rewind($stream); + $socket = Kafka_Socket::createFromStream($stream); + $set = new Kafka_MessageSet($socket, 0, 0); + $idx = 0; + foreach ($set as $offset => $msg) { + $this->assertEquals($messages[$idx++], $msg->payload()); + } + $this->assertEquals(count($messages), $idx); + + // test new offset + $readBytes = $set->validBytes(); + $this->assertEquals(69, $readBytes); + + // no more data + $set = new Kafka_MessageSet($socket, $readBytes, 0); + $cnt = 0; + foreach ($set as $offset => $msg) { + $cnt++; + } + $this->assertEquals(0, $cnt); + + fclose($stream); + } + + public function testMixedMessages() { + $stream = fopen('php://temp', 'w+b'); + $messages = array('message #1', 'message #2', 'message #3'); + $this->writeDummyCompressedMessageSet($stream, $messages, Kafka_Encoder::COMPRESSION_GZIP); + $messages2 = array('message #4', 'message #5', 'message #6'); + $this->writeDummyMessageSet($stream, $messages2, Kafka_Encoder::COMPRESSION_NONE); + $this->writeDummyCompressedMessageSet($stream, $messages, Kafka_Encoder::COMPRESSION_GZIP); + rewind($stream); + + $allMessages = $messages; + foreach ($messages2 as $msg) { + $allMessages[] = $msg; + } + foreach ($messages as $msg) { + $allMessages[] = $msg; + } + + $socket = Kafka_Socket::createFromStream($stream); + $set = new Kafka_MessageSet($socket, 0, 0); + $idx = 0; + foreach ($set as $offset => $msg) { + $this->assertEquals($allMessages[$idx++], $msg->payload()); + } + $this->assertEquals(count($allMessages), $idx); + + // test new offset + $readBytes = $set->validBytes(); + $this->assertEquals(198, $readBytes); + + // no more data + $set = new Kafka_MessageSet($socket, $readBytes, 0); + $cnt = 0; + foreach ($set as $offset => $msg) { + $cnt++; + } + $this->assertEquals(0, $cnt); + + fclose($stream); + } +} Index: clients/php/src/tests/Kafka/FetchRequestTest.php =================================================================== --- clients/php/src/tests/Kafka/FetchRequestTest.php (revision 1385060) +++ clients/php/src/tests/Kafka/FetchRequestTest.php (working copy) @@ -1,3 +1,5 @@ +topic = 'a test topic'; + $this->topic = 'testtopic'; $this->partition = 0; $this->offset = 0; $this->maxSize = 10000; @@ -54,11 +55,13 @@ public function testWriteTo() { $stream = fopen('php://temp', 'w+b'); - $this->req->writeTo($stream); + $socket = Kafka_Socket::createFromStream($stream); + $this->req->writeTo($socket); rewind($stream); $data = stream_get_contents($stream); fclose($stream); - $this->assertEquals(strlen($data), $this->req->sizeInBytes()); + $expected_len = strlen($data) - 6; //6 Bytes of headers + data + $this->assertEquals($expected_len, $this->req->sizeInBytes()); $this->assertContains($this->topic, $data); $this->assertContains($this->partition, $data); } @@ -67,9 +70,11 @@ $this->offset = 14; $this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset, $this->maxSize); $stream = fopen('php://temp', 'w+b'); - $this->req->writeTo($stream); + $socket = Kafka_Socket::createFromStream($stream); + $this->req->writeTo($socket); rewind($stream); //read it back + $headers = fread($stream, 6); $topicLen = array_shift(unpack('n', fread($stream, 2))); $this->assertEquals(strlen($this->topic), $topicLen); $this->assertEquals($this->topic, fread($stream, $topicLen)); @@ -85,4 +90,4 @@ $this->assertContains('offset:' . $this->offset, (string)$this->req); $this->assertContains('maxSize:' . $this->maxSize, (string)$this->req); } -} \ No newline at end of file +} Index: clients/php/src/tests/Kafka/ProducerTest.php =================================================================== --- clients/php/src/tests/Kafka/ProducerTest.php (revision 1385060) +++ clients/php/src/tests/Kafka/ProducerTest.php (working copy) @@ -1,3 +1,5 @@ +conn)) { - $this->conn = fopen('php://temp', 'w+b'); + if (null === $this->socket) { + $this->socket = Kafka_Socket::createFromStream(fopen('php://temp', 'w+b')); } } public function getData() { $this->connect(); - rewind($this->conn); - return stream_get_contents($this->conn); + $this->socket->rewind(); + return $this->socket->read(10000000); + } + + public function getHost() { + return $this->host; + } + + public function getPort() { + return $this->port; + } + + public function getCompression() { + return $this->compression; } } @@ -49,7 +62,7 @@ private $producer; public function setUp() { - $this->producer = new Kafka_ProducerMock('localhost', 1234); + $this->producer = new Kafka_ProducerMock('localhost', 1234, Kafka_Encoder::COMPRESSION_NONE); } public function tearDown() { @@ -57,7 +70,6 @@ unset($this->producer); } - public function testProducer() { $messages = array( 'test 1', @@ -73,4 +85,22 @@ $this->assertContains($msg, $sent); } } + + /** + * @expectedException Kafka_Exception_Socket + */ + public function testConnectFailure() { + $producer = new Kafka_Producer('invalid-host-name', 1234567890, Kafka_Encoder::COMPRESSION_NONE); + $producer->connect(); + $this->fail('The above call should throw an exception'); + } + + public function testSerialize() { + $producer = new Kafka_ProducerMock('host', 1234, Kafka_Encoder::COMPRESSION_SNAPPY); + $serialized = serialize($producer); + $unserialized = unserialize($serialized); + $this->assertEquals('host', $unserialized->getHost()); + $this->assertEquals(1234, $unserialized->getPort()); + $this->assertEquals(Kafka_Encoder::COMPRESSION_SNAPPY, $unserialized->getCompression()); + } } Index: clients/php/src/tests/Kafka/MessageTest.php =================================================================== --- clients/php/src/tests/Kafka/MessageTest.php (revision 1385060) +++ clients/php/src/tests/Kafka/MessageTest.php (working copy) @@ -1,3 +1,4 @@ + Index: clients/php/src/tests/Kafka/SimpleConsumerTest.php =================================================================== --- clients/php/src/tests/Kafka/SimpleConsumerTest.php (revision 0) +++ clients/php/src/tests/Kafka/SimpleConsumerTest.php (revision 0) @@ -0,0 +1,117 @@ + + */ +class Kafka_ConsumerMock extends Kafka_SimpleConsumer { + public function connect() { + if (null === $this->socket) { + $this->socket = Kafka_Socket::createFromStream(fopen('php://temp', 'w+b')); + } + } + + public function writeInt4($n) { + $this->socket->write(pack('N', $n)); + } + + public function writeInt2($n) { + $this->socket->write(pack('n', $n)); + } + + public function rewind() { + $this->socket->rewind(); + } + + public function getResponseSize() { + return parent::getResponseSize(); + } + + public function getResponseCode() { + return parent::getResponseCode(); + } +} + +/** + * Description of ProducerTest + * + * @author Lorenzo Alberton + */ +class Kafka_SimpleConsumerTest extends PHPUnit_Framework_TestCase +{ + /** + * @var Kafka_Producer + */ + private $consumer; + + public function setUp() { + $this->consumer = new Kafka_ConsumerMock('localhost', 1234, 10, 100000); + } + + public function tearDown() { + $this->consumer->close(); + unset($this->consumer); + } + + /** + * @expectedException Kafka_Exception_OutOfRange + */ + public function testInvalidMessageSize() { + $this->consumer->connect(); + $this->consumer->writeInt4(0); + $this->consumer->rewind(); + $this->consumer->getResponseSize(); + $this->fail('The above call should throw an exception'); + } + + public function testMessageSize() { + $this->consumer->connect(); + $this->consumer->writeInt4(10); + $this->consumer->rewind(); + $this->assertEquals(10, $this->consumer->getResponseSize()); + } + + public function testMessageCode() { + $this->consumer->connect(); + $this->consumer->writeInt2(1); + $this->consumer->rewind(); + $this->assertEquals(1, $this->consumer->getResponseCode()); + } + + /** + * @expectedException Kafka_Exception_Socket_EOF + */ + public function testMessageSizeFailure() { + $this->consumer->close(); + $this->consumer->getResponseSize(); + $this->fail('The above call should throw an exception'); + } + + /** + * @expectedException Kafka_Exception_Socket + */ + public function testConnectFailure() { + $consumer = new Kafka_SimpleConsumer('invalid-host-name', 1234567890, 10, 1000000); + $consumer->connect(); + $this->fail('The above call should throw an exception'); + } +} Index: clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php =================================================================== --- clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php (revision 1385060) +++ clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php (working copy) @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - - */ -class Kafka_BoundedByteBuffer_SendTest extends PHPUnit_Framework_TestCase -{ - private $stream; - private $topic; - private $partition; - private $offset; - - /** - * @var Kafka_FetchRequest - */ - private $req; - - /** - * @var Kafka_BoundedByteBuffer_Send - */ - private $obj = null; - - public function setUp() { - $this->stream = fopen('php://temp', 'w+b'); - $this->topic = 'a test topic'; - $this->partition = 0; - $this->offset = 0; - $maxSize = 10000; - $this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset, $maxSize); - $this->obj = new Kafka_BoundedByteBuffer_Send($this->req); - } - - public function tearDown() { - fclose($this->stream); - unset($this->obj); - } - - public function testWriteTo() { - // 4 bytes = size - // 2 bytes = request ID - $this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeTo($this->stream)); - } - - public function testWriteCompletely() { - // 4 bytes = size - // 2 bytes = request ID - $this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeCompletely($this->stream)); - } - - public function testWriteToWithBigRequest() { - $topicSize = 9000; - $this->topic = str_repeat('a', $topicSize); //bigger than the fread buffer, 8192 - $this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset); - $this->obj = new Kafka_BoundedByteBuffer_Send($this->req); - // 4 bytes = size - // 2 bytes = request ID - //$this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeTo($this->stream)); - $written = $this->obj->writeTo($this->stream); - $this->assertEquals(4 + 8192, $written); - $this->assertTrue($written < $topicSize); - } - - public function testWriteCompletelyWithBigRequest() { - $topicSize = 9000; - $this->topic = str_repeat('a', $topicSize); //bigger than the fread buffer, 8192 - $this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset); - $this->obj = new Kafka_BoundedByteBuffer_Send($this->req); - // 4 bytes = size - // 2 bytes = request ID - $this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeCompletely($this->stream)); - } - - /** - * @expectedException RuntimeException - */ - public function testWriteInvalidStream() { - $this->stream = fopen('php://temp', 'rb'); //read-only mode - $this->obj->writeTo($this->stream); - $this->fail('the above call should throw an exception'); - } -} Index: clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php =================================================================== --- clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php (revision 1385060) +++ clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php (working copy) @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - - */ -class Kafka_BoundedByteBuffer_ReceiveTest extends PHPUnit_Framework_TestCase -{ - private $stream = null; - private $size1 = 0; - private $msg1 = ''; - private $size2 = 0; - private $msg2 = ''; - - /** - * @var Kafka_BoundedByteBuffer_Receive - */ - private $obj = null; - - /** - * Append two message sets to a sample stream to verify that only the first one is read - */ - public function setUp() { - $this->stream = fopen('php://temp', 'w+b'); - $this->msg1 = 'test message'; - $this->msg2 = 'another message'; - $this->size1 = strlen($this->msg1); - $this->size2 = strlen($this->msg2); - fwrite($this->stream, pack('N', $this->size1)); - fwrite($this->stream, $this->msg1); - fwrite($this->stream, pack('N', $this->size2)); - fwrite($this->stream, $this->msg2); - rewind($this->stream); - $this->obj = new Kafka_BoundedByteBuffer_Receive; - } - - public function tearDown() { - fclose($this->stream); - unset($this->obj); - } - - public function testReadFrom() { - $this->assertEquals($this->size1 + 4, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->msg1, stream_get_contents($this->obj->buffer)); - //test that we don't go beyond the first message set - $this->assertEquals(0, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->size1 + 4, ftell($this->stream)); - } - - public function testReadCompletely() { - $this->assertEquals($this->size1 + 4, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->msg1, stream_get_contents($this->obj->buffer)); - //test that we don't go beyond the first message set - $this->assertEquals(0, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->size1 + 4, ftell($this->stream)); - } - - public function testReadFromOffset() { - fseek($this->stream, $this->size1 + 4); - $this->obj = new Kafka_BoundedByteBuffer_Receive; - $this->assertEquals($this->size2 + 4, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->msg2, stream_get_contents($this->obj->buffer)); - //test that we reached the end of the stream (2nd message set) - $this->assertEquals(0, $this->obj->readFrom($this->stream)); - $this->assertEquals($this->size1 + 4 + $this->size2 + 4, ftell($this->stream)); - } - - public function testReadCompletelyOffset() { - fseek($this->stream, $this->size1 + 4); - $this->obj = new Kafka_BoundedByteBuffer_Receive; - $this->assertEquals($this->size2 + 4, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->msg2, stream_get_contents($this->obj->buffer)); - //test that we reached the end of the stream (2nd message set) - $this->assertEquals(0, $this->obj->readCompletely($this->stream)); - $this->assertEquals($this->size1 + 4 + $this->size2 + 4, ftell($this->stream)); - } - - /** - * @expectedException RuntimeException - */ - public function testInvalidStream() { - $this->stream = fopen('php://temp', 'w+b'); - $this->obj->readFrom($this->stream); - $this->fail('The above call should throw an exception'); - } - - /** - * @expectedException RuntimeException - */ - public function testInvalidSizeTooBig() { - $maxSize = 10; - $this->obj = new Kafka_BoundedByteBuffer_Receive($maxSize); - $this->stream = fopen('php://temp', 'w+b'); - fwrite($this->stream, pack('N', $maxSize + 1)); - fwrite($this->stream, $this->msg1); - rewind($this->stream); - $this->obj->readFrom($this->stream); - $this->fail('The above call should throw an exception'); - } - - /** - * @expectedException RuntimeException - */ - public function testInvalidSizeNotPositive() { - $this->stream = fopen('php://temp', 'w+b'); - fwrite($this->stream, pack('N', 0)); - fwrite($this->stream, ''); - rewind($this->stream); - $this->obj->readFrom($this->stream); - $this->fail('The above call should throw an exception'); - } -} Index: clients/php/src/tests/Kafka/RequestTest.php =================================================================== --- clients/php/src/tests/Kafka/RequestTest.php (revision 0) +++ clients/php/src/tests/Kafka/RequestTest.php (revision 0) @@ -0,0 +1,77 @@ + + */ +class Kafka_RequestTest extends PHPUnit_Framework_TestCase +{ + public function testEncodeDecode64bitShortUnsigned() { + $short = 3; + $encoded = Kafka_Request::packLong64bigendian($short); + $this->assertEquals($short, Kafka_Request::unpackLong64bigendian($encoded)); + } + + public function testEncodeDecode64bitShortSigned() { + $short = -3; + $encoded = Kafka_Request::packLong64bigendian($short); + $this->assertEquals($short, Kafka_Request::unpackLong64bigendian($encoded)); + } + + public function testEncodeDecode64bitIntUnsigned() { + $int = 32767; + $encoded = Kafka_Request::packLong64bigendian($int); + $this->assertEquals($int, Kafka_Request::unpackLong64bigendian($encoded)); + + $int = 32768; + $encoded = Kafka_Request::packLong64bigendian($int); + $this->assertEquals($int, Kafka_Request::unpackLong64bigendian($encoded)); + } + + public function testEncodeDecode64bitIntSigned() { + $int = -32768; + $encoded = Kafka_Request::packLong64bigendian($int); + $this->assertEquals($int, Kafka_Request::unpackLong64bigendian($encoded)); + + $int = -32769; + $encoded = Kafka_Request::packLong64bigendian($int); + $this->assertEquals($int, Kafka_Request::unpackLong64bigendian($encoded)); + } + + public function testEncodeDecode64bitLongUnsigned() { + $long = 2147483647; + $encoded = Kafka_Request::packLong64bigendian($long); + $this->assertEquals($long, Kafka_Request::unpackLong64bigendian($encoded)); + + $long = 4294967295; + $encoded = Kafka_Request::packLong64bigendian($long); + $this->assertEquals($long, Kafka_Request::unpackLong64bigendian($encoded)); + } + + public function testEncodeDecode64bitLongSigned() { + $long = -2147483648; + $encoded = Kafka_Request::packLong64bigendian($long); + $this->assertEquals($long, Kafka_Request::unpackLong64bigendian($encoded)); + + $long = -2147483649; + $encoded = Kafka_Request::packLong64bigendian($long); + $this->assertEquals($long, Kafka_Request::unpackLong64bigendian($encoded)); + } +} Index: clients/php/src/tests/Kafka/SocketTest.php =================================================================== --- clients/php/src/tests/Kafka/SocketTest.php (revision 0) +++ clients/php/src/tests/Kafka/SocketTest.php (revision 0) @@ -0,0 +1,166 @@ + + */ +class Kafka_SocketTest extends PHPUnit_Framework_TestCase +{ + /** + * @expectedException Kafka_Exception_Socket_Connection + */ + public function testConnectNoHost() { + $socket = new Kafka_Socket(null, 80); + $socket->connect(); + $this->fail('The above connect() call should fail on a null host'); + } + + /** + * @expectedException Kafka_Exception_Socket_Connection + */ + public function testConnectInvalidPort() { + $socket = new Kafka_Socket('localhost', 80); + $socket->connect(); + $this->fail('The above connect() call should fail on an invalid port'); + } + + /** + * @expectedException Kafka_Exception_Socket_Connection + */ + public function testConnectInvalidHost() { + $socket = new Kafka_Socket('invalid-host', 80); + $socket->connect(); + $this->fail('The above connect() call should fail on an invalid host'); + } + + /** + * @expectedException Kafka_Exception_Socket + */ + public function testWriteNoStream() { + $socket = Kafka_Socket::createFromStream(null); + $socket->write('test'); + //$socket->rewind(); + //var_dump($socket->read(4)); + $this->fail('The above write() call should fail on a null socket'); + } + + /** + * @expectedException Kafka_Exception_Socket + */ + public function testWriteReadOnlySocket() { + $roStream = fopen('php://temp', 'r'); + $socket = Kafka_Socket::createFromStream($roStream); + $socket->write('test'); + //$socket->rewind(); + //var_dump($socket->read(4)); + $this->fail('The above write() call should fail on a read-only socket'); + } + + /** + * @expectedException Kafka_Exception_Socket_Timeout + */ + public function testWriteTimeout() { + $this->markTestSkipped('find a better way of testing socket timeouts'); + $stream = fopen('php://temp', 'w+b'); + $socket = new Kafka_Socket('localhost', 0, 0, 0, -1, -1); + $socket->setStream($stream); + $socket->write('short timeout'); + //$socket->rewind(); + //var_dump($socket->read(4)); + $this->fail('The above write() call should fail on a socket with timeout = -1'); + } + + public function testWrite() { + $socket = Kafka_Socket::createFromStream(fopen('php://temp', 'w+b')); + $written = $socket->write('test'); + $this->assertEquals(4, $written); + } + + public function testWriteAndRead() { + $socket = Kafka_Socket::createFromStream(fopen('php://temp', 'w+b')); + $written = $socket->write('test'); + $socket->rewind(); + $this->assertEquals('test', $socket->read(4)); + } + + public function testRead() { + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, 'test'); + fseek($stream, 0, SEEK_SET); + $socket = Kafka_Socket::createFromStream($stream); + $this->assertEquals('test', $socket->read(4)); + } + + public function testReadFewerBytes() { + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, 'tes'); + fseek($stream, 0, SEEK_SET); + $socket = Kafka_Socket::createFromStream($stream); + $this->assertEquals('tes', $socket->read(4)); + } + + /** + * + * @expectedException Kafka_Exception_Socket_EOF + */ + public function testReadFewerBytesVerifyLength() { + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, 'tes'); + fseek($stream, 0, SEEK_SET); + $socket = Kafka_Socket::createFromStream($stream); + $this->assertEquals('tes', $socket->read(4, true)); + $this->fail('The above call shoud throw an exception because the socket had fewer bytes than requested'); + } + + public function testReadMultiple() { + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, 'test1test2'); + fseek($stream, 0, SEEK_SET); + $socket = Kafka_Socket::createFromStream($stream); + $this->assertEquals('test1', $socket->read(5)); + $this->assertEquals('test2', $socket->read(5)); + } + + /** + * @expectedException Kafka_Exception_Socket + */ + public function testReadAfterClose() { + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, 'test'); + fseek($stream, 0, SEEK_SET); + $socket = Kafka_Socket::createFromStream($stream); + $socket->close(); + $socket->read(4); + $this->fail('The above read() call should fail on a closed socket'); + } + + /** + * @expectedException Kafka_Exception_Socket + */ + public function testWriteAfterClose() { + $stream = fopen('php://temp', 'w+b'); + $socket = Kafka_Socket::createFromStream($stream); + $socket->close(); + $socket->write('test'); + $this->fail('The above write() call should fail on a closed socket'); + } +} Index: clients/php/src/lib/Kafka/Response.php =================================================================== --- clients/php/src/lib/Kafka/Response.php (revision 0) +++ clients/php/src/lib/Kafka/Response.php (revision 0) @@ -0,0 +1,43 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @version $Revision: $ + * @link http://sna-projects.com/kafka/ + */ + +/** + * Response class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Response +{ + /** + * Validate the error code from the response + * + * @param integer $errorCode Error code + * + * @return void + * @throws Kafka_Exception + */ + static public function validateErrorCode($errorCode) { + switch ($errorCode) { + case 0: break; //success + case 1: throw new Kafka_Exception_OffsetOutOfRange('OffsetOutOfRange reading response errorCode'); + case 2: throw new Kafka_Exception_InvalidMessage('InvalidMessage reading response errorCode'); + case 3: throw new Kafka_Exception_WrongPartition('WrongPartition reading response errorCode'); + case 4: throw new Kafka_Exception_InvalidFetchSize('InvalidFetchSize reading response errorCode'); + default: throw new Kafka_Exception('Unknown error reading response errorCode (' . $errorCode . ')'); + } + } +} Index: clients/php/src/lib/Kafka/Exception.php =================================================================== --- clients/php/src/lib/Kafka/Exception.php (revision 0) +++ clients/php/src/lib/Kafka/Exception.php (revision 0) @@ -0,0 +1,26 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @version $Revision: $ + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception extends RuntimeException +{ + +} \ No newline at end of file Index: clients/php/src/lib/Kafka/OffsetRequest.php =================================================================== --- clients/php/src/lib/Kafka/OffsetRequest.php (revision 0) +++ clients/php/src/lib/Kafka/OffsetRequest.php (revision 0) @@ -0,0 +1,130 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @version $Revision: $ + * @link http://sna-projects.com/kafka/ + */ + +/** + * Represents a request object + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_OffsetRequest extends Kafka_Request +{ + /** + * @var integer + */ + private $time; + + /** + * @var integer + */ + private $maxSize; + + /** + * @param string $topic Topic + * @param integer $partition Partition + * @param integer $time Time in millisecs + * (-1, from the latest offset available, + * -2 from the smallest offset available) + * @param integer $maxNumOffsets Max number of offsets to return + */ + public function __construct($topic, $partition, $time, $maxNumOffsets) { + $this->id = Kafka_RequestKeys::OFFSETS; + $this->topic = $topic; + $this->partition = $partition; + $this->time = $time; + $this->maxNumOffsets = $maxNumOffsets; + } + + /** + * Write the request to the output stream + * + * @param Kafka_Socket $socket Output stream + * + * @return void + */ + public function writeTo(Kafka_Socket $socket) { + $this->writeRequestHeader($socket); + + // TIMESTAMP (long) + $socket->write(self::packLong64bigendian($this->time)); + // N_OFFSETS (int) + $socket->write(pack('N', $this->maxNumOffsets)); + } + + /** + * Get request size in bytes + * + * @return integer + */ + public function sizeInBytes() { + return 2 + strlen($this->topic) + 4 + 8 + 4; + } + + /** + * Get time + * + * @return integer + */ + public function getTime() { + return $this->time; + } + + /** + * Get maxSize + * + * @return integer + */ + public function getMaxSize() { + return $this->maxSize; + } + + /** + * Get partition + * + * @return integer + */ + public function getPartition() { + return $this->partition; + } + + /** + * Parse the response and return the array of offsets + * + * @param Kafka_Socket $socket Socket handle + * + * @return array + */ + static public function deserializeOffsetArray(Kafka_Socket $socket) { + $nOffsets = array_shift(unpack('N', $socket->read(4))); + if ($nOffsets < 0) { + throw new Kafka_Exception_OutOfRange($nOffsets . ' is not a valid number of offsets'); + } + $offsets = array(); + for ($i=0; $i < $nOffsets; ++$i) { + $offsets[] = self::unpackLong64bigendian($socket->read(8)); + } + return $offsets; + } + + /** + * String representation of the Fetch Request + * + * @return string + */ + public function __toString() { + return 'topic:' . $this->topic . ', part:' . $this->partition . ' offset:' . $this->offset . ' maxSize:' . $this->maxSize; + } +} Index: clients/php/src/lib/Kafka/MessageSet.php =================================================================== --- clients/php/src/lib/Kafka/MessageSet.php (revision 1385060) +++ clients/php/src/lib/Kafka/MessageSet.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -21,7 +21,17 @@ * @link http://sna-projects.com/kafka/ */ class Kafka_MessageSet implements Iterator -{ +{ + /** + * @var Kafka_Socket + */ + protected $socket = null; + + /** + * @var integer + */ + protected $initialOffset = 0; + /** * @var integer */ @@ -33,28 +43,58 @@ private $valid = false; /** - * @var array + * @var Kafka_Message */ - private $array = array(); + private $msg; + + /** + * @var Kafka_MessageSetInternalIterator + */ + private $internalIterator = null; /** * Constructor * - * @param resource $stream Stream resource - * @param integer $errorCode Error code + * @param Kafka_Socket $socket Stream resource + * @param integer $initialOffset Initial offset */ - public function __construct($stream, $errorCode = 0) { - $data = stream_get_contents($stream); - $len = strlen($data); - $ptr = 0; - while ($ptr <= ($len - 4)) { - $size = array_shift(unpack('N', substr($data, $ptr, 4))); - $ptr += 4; - $this->array[] = new Kafka_Message(substr($data, $ptr, $size)); - $ptr += $size; - $this->validByteCount += 4 + $size; + public function __construct(Kafka_Socket $socket, $initialOffset = 0) { + $this->socket = $socket; + $this->initialOffset = $initialOffset; + } + + /** + * Read the size of the next message (4 bytes) + * + * @return integer Size of the response buffer in bytes + * @throws Kafka_Exception when size is <=0 or >= $maxSize + */ + protected function getMessageSize() { + $size = array_shift(unpack('N', $this->socket->read(4, true))); + if ($size <= 0) { + throw new Kafka_Exception_OutOfRange($size . ' is not a valid message size'); } - fclose($stream); + // TODO check if $size is too large + return $size; + } + + /** + * Read the next message + * + * @return string Message (raw) + * @throws Kafka_Exception when the message cannot be read from the stream buffer + */ + protected function getMessage() { + try { + $size = $this->getMessageSize(); + $msg = $this->socket->read($size, true); + } catch (Kafka_Exception_Socket_EOF $e) { + $size = isset($size) ? $size : 'enough'; + $logMsg = 'Cannot read ' . $size . ' bytes, the message is likely bigger than the buffer'; + throw new Kafka_Exception_OutOfRange($logMsg); + } + $this->validByteCount += 4 + $size; + return $msg; } /** @@ -81,8 +121,15 @@ * @return void */ public function next() { - $this->valid = (FALSE !== next($this->array)); - } + if (null !== $this->internalIterator) { + $this->internalIterator->next(); + if ($this->internalIterator->valid()) { + return; + } + } + $this->internalIterator = null; + $this->preloadNextMessage(); + } /** * valid @@ -99,7 +146,7 @@ * @return integer */ public function key() { - return key($this->array); + return $this->validByteCount; } /** @@ -108,15 +155,41 @@ * @return Kafka_Message */ public function current() { - return current($this->array); + if (null !== $this->internalIterator && $this->internalIterator->valid()) { + return $this->internalIterator->current(); + } + return $this->msg; } /** - * rewind + * rewind - Cannot use fseek() * * @return void */ public function rewind() { - $this->valid = (FALSE !== reset($this->array)); + $this->internalIterator = null; + $this->validByteCount = 0; + $this->preloadNextMessage(); + } + + /** + * Preload the next message + * + * @return void + */ + private function preloadNextMessage() { + try { + $this->msg = new Kafka_Message($this->getMessage()); + if ($this->msg->compression() != Kafka_Encoder::COMPRESSION_NONE) { + $this->internalIterator = $this->msg->payload(); + $this->internalIterator->rewind(); + $this->msg = null; + } else { + $this->internalIterator = null; + } + $this->valid = TRUE; + } catch (Kafka_Exception_OutOfRange $e) { + $this->valid = FALSE; + } } } Index: clients/php/src/lib/Kafka/FetchRequest.php =================================================================== --- clients/php/src/lib/Kafka/FetchRequest.php (revision 1385060) +++ clients/php/src/lib/Kafka/FetchRequest.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -23,16 +23,6 @@ class Kafka_FetchRequest extends Kafka_Request { /** - * @var string - */ - private $topic; - - /** - * @var integer - */ - private $partition; - - /** * @var integer */ private $offset; @@ -63,18 +53,13 @@ * * @return void */ - public function writeTo($stream) { - //echo "\nWriting request to stream: " . (string)$this; - // - fwrite($stream, pack('n', strlen($this->topic)) . $this->topic); - // - fwrite($stream, pack('N', $this->partition)); - -//TODO: need to store a 64bit integer (bigendian), but PHP only supports 32bit integers: -//setting first 32 bits to 0 - fwrite($stream, pack('N2', 0, $this->offset)); - fwrite($stream, pack('N', $this->maxSize)); - //echo "\nWritten request to stream: " .(string)$this; + public function writeTo(Kafka_Socket $socket) { + $this->writeRequestHeader($socket); + + // OFFSET (long) + $socket->write(self::packLong64bigendian($this->offset)); + // MAX_SIZE (int) + $socket->write(pack('N', $this->maxSize)); } /** @@ -83,6 +68,7 @@ * @return integer */ public function sizeInBytes() { + // + + + + return 2 + strlen($this->topic) + 4 + 8 + 4; } @@ -118,9 +104,7 @@ * * @return string */ - public function __toString() - { + public function __toString() { return 'topic:' . $this->topic . ', part:' . $this->partition . ' offset:' . $this->offset . ' maxSize:' . $this->maxSize; } } - Index: clients/php/src/lib/Kafka/SimpleConsumer.php =================================================================== --- clients/php/src/lib/Kafka/SimpleConsumer.php (revision 1385060) +++ clients/php/src/lib/Kafka/SimpleConsumer.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -23,33 +23,88 @@ class Kafka_SimpleConsumer { /** + * Latest offset available + * + * @const int + */ + const OFFSET_LAST = -1; + + /** + * Smallest offset available + * + * @const int + */ + const OFFSET_FIRST = -2; + + /** * @var string */ - protected $host = 'localhost'; - + protected $host = 'localhost'; + /** * @var integer */ - protected $port = 9092; - + protected $port = 9092; + + /** + * @var Kafka_Socket + */ + protected $socket = null; + + /** + * Send timeout in seconds. + * + * Combined with sendTimeoutUsec this is used for send timeouts. + * + * @var int + */ + private $sendTimeoutSec = 0; + + /** + * Send timeout in microseconds. + * + * Combined with sendTimeoutSec this is used for send timeouts. + * + * @var int + */ + private $sendTimeoutUsec = 100000; + + /** + * Recv timeout in seconds + * + * Combined with recvTimeoutUsec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutSec = 0; + + /** + * Recv timeout in microseconds + * + * Combined with recvTimeoutSec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutUsec = 250000; + /** * @var integer */ - protected $socketTimeout = 10; - + protected $socketTimeout = 10; + /** * @var integer */ protected $socketBufferSize = 1000000; /** - * @var resource + * @var integer */ - protected $conn = null; - + protected $lastResponseSize = 0; + /** * Constructor - * + * * @param integer $host Kafka Hostname * @param integer $port Port * @param integer $socketTimeout Socket timeout @@ -58,85 +113,158 @@ public function __construct($host, $port, $socketTimeout, $socketBufferSize) { $this->host = $host; $this->port = $port; - $this->socketTimeout = $socketTimeout; + $this->recvTimeoutSec = $socketTimeout; + $this->sendTimeoutSec = $socketTimeout; $this->socketBufferSize = $socketBufferSize; } - + + /** + * Set recv/send socket timeouts (in seconds and microseconds) + * + * @param integer $recvTimeoutSec Recv timeout in seconds + * @param integer $recvTimeoutUsec Recv timeout in microseconds + * @param integer $sendTimeoutSec Send timeout in seconds + * @param integer $sendTimeoutUsec Send timeout in microseconds + * + * @return + */ + public function setSocketTimeouts($recvTimeoutSec = 0, $recvTimeoutUsec = 250000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000) { + $this->recvTimeoutSec = (int) $recvTimeoutSec; + $this->recvTimeoutUsec = (int) $recvTimeoutUsec; + $this->sendTimeoutSec = (int) $sendTimeoutSec; + $this->sendTimeoutUsec = (int) $sendTimeoutUsec; + } + /** * Connect to Kafka via socket - * + * * @return void */ public function connect() { - if (!is_resource($this->conn)) { - $this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); - if (!$this->conn) { - throw new RuntimeException($errstr, $errno); - } - stream_set_timeout($this->conn, $this->socketTimeout); - stream_set_read_buffer($this->conn, $this->socketBufferSize); - stream_set_write_buffer($this->conn, $this->socketBufferSize); - //echo "\nConnected to ".$this->host.":".$this->port."\n"; + if (null === $this->socket) { + $this->socket = new Kafka_Socket( + $this->host, + $this->port, + $this->recvTimeoutSec, + $this->recvTimeoutUsec, + $this->sendTimeoutSec, + $this->sendTimeoutUsec + ); } + $this->socket->connect(); } /** * Close the connection - * + * * @return void */ public function close() { - if (is_resource($this->conn)) { - fclose($this->conn); + if (null !== $this->socket) { + $this->socket->close(); } } /** * Send a request and fetch the response - * - * @param Kafka_FetchRequest $req Request + * + * @param Kafka_Request $req Request * * @return Kafka_MessageSet $messages + * @throws Kafka_Exception */ - public function fetch(Kafka_FetchRequest $req) { + public function fetch(Kafka_Request $req) { $this->connect(); - $this->sendRequest($req); - //echo "\nRequest sent: ".(string)$req."\n"; - $response = $this->getResponse(); - //var_dump($response); - $this->close(); - return new Kafka_MessageSet($response['response']->buffer, $response['errorCode']); + // send request + $req->writeTo($this->socket); + + // get response + $this->lastResponseSize = $this->getResponseSize(); + $responseCode = $this->getResponseCode(); + $initialOffset = 6; + + // validate response + Kafka_Response::validateErrorCode($responseCode); + if ($this->lastResponseSize == 2) { + throw new Kafka_Exception_EmptyQueue(); + } + + return new Kafka_MessageSet($this->socket, $initialOffset); } - + /** - * Send the request - * - * @param Kafka_FetchRequest $req Request - * - * @return void + * Get the last response size + * + * @return integer + */ + public function getLastResponseSize() { + return $this->lastResponseSize; + } + + /** + * Read the request size (4 bytes) if not read yet + * + * @param resource $stream Stream resource + * + * @return integer Size of the response buffer in bytes + * @throws Kafka_Exception_Socket_EOF + * @throws Kafka_Exception_Socket_Timeout + * @throws Kafka_Exception when size is <=0 or >= $maxSize */ - protected function sendRequest(Kafka_FetchRequest $req) { - $send = new Kafka_BoundedByteBuffer_Send($req); - $send->writeCompletely($this->conn); - } - - /** - * Get the response - * - * @return array - */ - protected function getResponse() { - $response = new Kafka_BoundedByteBuffer_Receive(); - $response->readCompletely($this->conn); - - rewind($response->buffer); - // this has the side effect of setting the initial position of buffer correctly - $errorCode = array_shift(unpack('n', fread($response->buffer, 2))); - //rewind($response->buffer); - return array( - 'response' => $response, - 'errorCode' => $errorCode, - ); + protected function getResponseSize() { + $this->connect(); + $size = $this->socket->read(4, true); + $size = array_shift(unpack('N', $size)); + if ($size <= 0) { + throw new Kafka_Exception_OutOfRange($size . ' is not a valid response size'); + } + return $size; + } + + /** + * Read the response error code + * + * @return integer Error code + */ + protected function getResponseCode() { + $this->connect(); + return array_shift(unpack('n', $this->socket->read(2, true))); + } + + /** + * Get a list of valid offsets (up to maxSize) before the given time. + * The result is a list of offsets, in descending order. + * + * @param time: time in millisecs (-1 from the latest offset available, -2 from the smallest offset available) + * + * @return an array of offsets + */ + public function getOffsetsBefore($topic, $partition, $time, $maxNumOffsets) { + $req = new Kafka_OffsetRequest($topic, $partition, $time, $maxNumOffsets); + try { + $this->connect(); + // send request + $req->writeTo($this->socket); + //echo "\nRequest sent: ".(string)$req."\n"; + } catch (Kafka_Socket_Exception_EOF $e) { + //echo "\nReconnect in get offetset request due to socket error: " . $e->getMessage(); + // retry once + $this->connect(); + $req->writeTo($this->socket); + } + $size = $this->getResponseSize(); + $errorCode = $this->getResponseCode(); + Kafka_Response::validateErrorCode($errorCode); + + return Kafka_OffsetRequest::deserializeOffsetArray($this->socket); + } + + /** + * Close the socket connection if still open + * + * @return vpopmail_del_domain(domain) + */ + public function __destruct() { + $this->close(); } - } Index: clients/php/src/lib/Kafka/MessageSetInternalIterator.php =================================================================== --- clients/php/src/lib/Kafka/MessageSetInternalIterator.php (revision 0) +++ clients/php/src/lib/Kafka/MessageSetInternalIterator.php (revision 0) @@ -0,0 +1,38 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @version $Revision: $ + * @link http://sna-projects.com/kafka/ + */ + +/** + * A sequence of messages stored in a byte buffer + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_MessageSetInternalIterator extends Kafka_MessageSet +{ + /** + * Read the next message. + * Override the parent method: we don't want to increment the byte offset + * + * @return string Message (raw) + * @throws Kafka_Exception when the message cannot be read from the stream buffer + */ + protected function getMessage() { + $msg = parent::getMessage(); + // do not increment the offset for internal iterators + $this->validByteCount = 0; + return $msg; + } +} Index: clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php =================================================================== --- clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php (revision 1385060) +++ clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php (working copy) @@ -1,118 +0,0 @@ - - * @copyright 2011 Lorenzo Alberton - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 - * @version $Revision: $ - * @link http://sna-projects.com/kafka/ - */ - -/** - * Send a request to Kafka - * - * @category Libraries - * @package Kafka - * @author Lorenzo Alberton - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 - * @link http://sna-projects.com/kafka/ - */ -class Kafka_BoundedByteBuffer_Send -{ - /** - * @var integer - */ - protected $size; - - /** - * @var boolean - */ - protected $sizeWritten = false; - - /** - * @var string resource - */ - protected $buffer; - - /** - * @var boolean - */ - protected $complete = false; - - /** - * Constructor - * - * @param Kafka_FetchRequest $req Request object - */ - public function __construct(Kafka_FetchRequest $req) { - $this->size = $req->sizeInBytes() + 2; - $this->buffer = fopen('php://temp', 'w+b'); - fwrite($this->buffer, pack('n', $req->id)); - $req->writeTo($this->buffer); - rewind($this->buffer); - //fseek($this->buffer, $req->getOffset(), SEEK_SET); - } - - /** - * Try to write the request size if we haven't already - * - * @param resource $stream Stream resource - * - * @return integer Number of bytes read - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - private function writeRequestSize($stream) { - if (!$this->sizeWritten) { - if (!fwrite($stream, pack('N', $this->size))) { - throw new RuntimeException('Cannot write request to stream (' . error_get_last() . ')'); - } - $this->sizeWritten = true; - return 4; - } - return 0; - } - - /** - * Write a chunk of data to the stream - * - * @param resource $stream Stream resource - * - * @return integer number of written bytes - * @throws RuntimeException - */ - public function writeTo($stream) { - // have we written the request size yet? - $written = $this->writeRequestSize($stream); - - // try to write the actual buffer itself - if ($this->sizeWritten && !feof($this->buffer)) { - //TODO: check that fread returns something - $written += fwrite($stream, fread($this->buffer, 8192)); - } - // if we are done, mark it off - if (feof($this->buffer)) { - $this->complete = true; - fclose($this->buffer); - } - return $written; - } - - /** - * Write the entire request to the stream - * - * @param resource $stream Stream resource - * - * @return integer number of written bytes - */ - public function writeCompletely($stream) { - $written = 0; - while (!$this->complete) { - $written += $this->writeTo($stream); - } - //echo "\nWritten " . $written . ' bytes '; - return $written; - } -} Index: clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php =================================================================== --- clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php (revision 1385060) +++ clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php (working copy) @@ -1,154 +0,0 @@ - - * @copyright 2011 Lorenzo Alberton - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 - * @version $Revision: $ - * @link http://sna-projects.com/kafka/ - */ - -/** - * Read an entire message set from a stream into an internal buffer - * - * @category Libraries - * @package Kafka - * @author Lorenzo Alberton - * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 - * @link http://sna-projects.com/kafka/ - */ -class Kafka_BoundedByteBuffer_Receive -{ - /** - * @var integer - */ - protected $size; - - /** - * @var boolean - */ - protected $sizeRead = false; - - /** - * @var integer - */ - protected $remainingBytes = 0; - - /** - * @var string resource - */ - public $buffer = null; - - /** - * @var boolean - */ - protected $complete = false; - - /** - * - * @var integer - */ - protected $maxSize = PHP_INT_MAX; - - /** - * Constructor - * - * @param integer $maxSize Max buffer size - */ - public function __construct($maxSize = PHP_INT_MAX) { - $this->maxSize = $maxSize; - } - - /** - * Destructor - * - * @return void - */ - public function __destruct() { - if (is_resource($this->buffer)) { - fclose($this->buffer); - } - } - - /** - * Read the request size (4 bytes) if not read yet - * - * @param resource $stream Stream resource - * - * @return integer Number of bytes read - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - private function readRequestSize($stream) { - if (!$this->sizeRead) { - $this->size = fread($stream, 4); - if ((false === $this->size) || ('' === $this->size)) { - $errmsg = 'Received nothing when reading from channel, socket has likely been closed.'; - throw new RuntimeException($errmsg); - } - $this->size = array_shift(unpack('N', $this->size)); - if ($this->size <= 0 || $this->size > $this->maxSize) { - throw new RuntimeException($this->size . ' is not a valid message size'); - } - $this->remainingBytes = $this->size; - $this->sizeRead = true; - return 4; - } - return 0; - } - - /** - * Read a chunk of data from the stream - * - * @param resource $stream Stream resource - * - * @return integer number of read bytes - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - public function readFrom($stream) { - // have we read the request size yet? - $read = $this->readRequestSize($stream); - // have we allocated the request buffer yet? - if (!$this->buffer) { - $this->buffer = fopen('php://temp', 'w+b'); - } - // if we have a buffer, read some stuff into it - if ($this->buffer && !$this->complete) { - $freadBufferSize = min(8192, $this->remainingBytes); - if ($freadBufferSize > 0) { - //TODO: check that fread returns something - $bytesRead = fwrite($this->buffer, fread($stream, $freadBufferSize)); - $this->remainingBytes -= $bytesRead; - $read += $bytesRead; - } - // did we get everything? - if ($this->remainingBytes <= 0) { - rewind($this->buffer); - $this->complete = true; - } - } - return $read; - } - - /** - * Read all the available bytes in the stream - * - * @param resource $stream Stream resource - * - * @return integer number of read bytes - * @throws RuntimeException when size is <=0 or >= $maxSize - */ - public function readCompletely($stream) { - $read = 0; - while (!$this->complete) { - $read += $this->readFrom($stream); - } - return $read; - } -} - - - - \ No newline at end of file Index: clients/php/src/lib/Kafka/Exception/OutOfRange.php =================================================================== --- clients/php/src/lib/Kafka/Exception/OutOfRange.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/OutOfRange.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_OutOfRange extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/InvalidFetchSize.php =================================================================== --- clients/php/src/lib/Kafka/Exception/InvalidFetchSize.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/InvalidFetchSize.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_InvalidFetchSize extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/InvalidMessage.php =================================================================== --- clients/php/src/lib/Kafka/Exception/InvalidMessage.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/InvalidMessage.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_InvalidMessage extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/OffsetOutOfRange.php =================================================================== --- clients/php/src/lib/Kafka/Exception/OffsetOutOfRange.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/OffsetOutOfRange.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_OffsetOutOfRange extends Kafka_Exception +{ + +} \ No newline at end of file Index: clients/php/src/lib/Kafka/Exception/Socket/EOF.php =================================================================== --- clients/php/src/lib/Kafka/Exception/Socket/EOF.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/Socket/EOF.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Socket EndOfFile exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_Socket_EOF extends Kafka_Exception_Socket +{ + +} Index: clients/php/src/lib/Kafka/Exception/Socket/Connection.php =================================================================== --- clients/php/src/lib/Kafka/Exception/Socket/Connection.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/Socket/Connection.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Socket EndOfFile exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_Socket_Connection extends Kafka_Exception_Socket +{ + +} Index: clients/php/src/lib/Kafka/Exception/Socket/Timeout.php =================================================================== --- clients/php/src/lib/Kafka/Exception/Socket/Timeout.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/Socket/Timeout.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Socket Timeout exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_Socket_Timeout extends Kafka_Exception_Socket +{ + +} Index: clients/php/src/lib/Kafka/Exception/EmptyQueue.php =================================================================== --- clients/php/src/lib/Kafka/Exception/EmptyQueue.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/EmptyQueue.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Exception class. Thrown when there's no new data to read from the queue + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_EmptyQueue extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/InvalidTopic.php =================================================================== --- clients/php/src/lib/Kafka/Exception/InvalidTopic.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/InvalidTopic.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_InvalidTopic extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/NotSupported.php =================================================================== --- clients/php/src/lib/Kafka/Exception/NotSupported.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/NotSupported.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_NotSupported extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/WrongPartition.php =================================================================== --- clients/php/src/lib/Kafka/Exception/WrongPartition.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/WrongPartition.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_WrongPartition extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/Exception/Socket.php =================================================================== --- clients/php/src/lib/Kafka/Exception/Socket.php (revision 0) +++ clients/php/src/lib/Kafka/Exception/Socket.php (revision 0) @@ -0,0 +1,25 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Base Socket exception class + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Exception_Socket extends Kafka_Exception +{ + +} Index: clients/php/src/lib/Kafka/ZookeeperConsumer.php =================================================================== --- clients/php/src/lib/Kafka/ZookeeperConsumer.php (revision 0) +++ clients/php/src/lib/Kafka/ZookeeperConsumer.php (revision 0) @@ -0,0 +1,305 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * Zookeeper-based Kafka Consumer + * + * This is a sample implementation, there can be different strategies on how to consume + * data from different brokers/partitions. Here the strategy is to read up to MAX_BATCH_SIZE + * bytes from each partition before moving to the next. The order of brokers/partitions is + * randomised in each loop to consume data from all queues in a more-or-less fair way. + * An alternative strategy would be to round-robin the brokers/partitions, reading one message + * from each; this strategy would be fairer, but way less efficient. + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_ZookeeperConsumer implements Iterator +{ + /** + * @var Kafka_Registry_Topic + */ + protected $topicRegistry; + + /** + * @var Kafka_Registry_Broker + */ + protected $brokerRegistry; + + /** + * @var Kafka_Registry_Offset + */ + protected $offsetRegistry; + + /** + * @var string + */ + protected $topic; + + /** + * @var integer + */ + protected $readBytes = 0; + + /** + * @var integer + */ + protected $socketTimeout = 0; + + /** + * @var integer + */ + protected $maxBatchSize = 20000000; + + /** + * @var array + */ + protected $iterators = array(); + + /** + * @var integer + */ + protected $idx = 0; + + /** + * @var integer + */ + protected $nIterators = 0; + + /** + * @var boolean + */ + protected $hasMore = false; + + /** + * Create a new BatchedConsumer for a topic using the given TopicReigstry and OffsetRegistry. + * + * @param Kafka_Registry_Topic $topicRegistry a registry for the discovery of topic partitions + * @param Kafka_Registry_Broker $brokerRegistry a registry for the tracking of brokers + * @param Kafka_Registry_Offset $offsetRegistry a registry for the tracking of the consumer offsets + * @param string $topic the topic to consume from + * @param integer $maxBatchSize maximum batch size (in bytes) + */ + public function __construct( + Kafka_Registry_Topic $topicRegistry, + Kafka_Registry_Broker $brokerRegistry, + Kafka_Registry_Offset $offsetRegistry, + $topic, + $maxBatchSize = 20000000 + ) { + $this->topicRegistry = $topicRegistry; + $this->brokerRegistry = $brokerRegistry; + $this->offsetRegistry = $offsetRegistry; + $this->topic = $topic; + $this->maxBatchSize = $maxBatchSize; + } + + /** + * Shuffle the internal iterators for each broker/partition + * + * @return void + */ + public function shuffle() { + shuffle($this->iterators); + } + + /** + * Advance the iterator's pointer + * + * @return void + */ + public function next() { + return $this->iterators[$this->idx]->messages->next(); + } + + /** + * Get the key for this item + * + * @return integer + */ + public function key() { + return $this->iterators[$this->idx]->messages->key(); + } + + /** + * Get the current message + * + * @return mixed + */ + public function current() { + return $this->iterators[$this->idx]->messages->current()->payload(); + } + + /** + * Check whether we have a valid iterator + * + * @return boolean + */ + public function valid() { + while ($this->idx < $this->nIterators) { + $it = $this->iterators[$this->idx]; + try { + if (null === $it->messages) { + $it->consumer = new Kafka_SimpleConsumer($it->host, $it->port, $this->socketTimeout, $this->maxBatchSize); + $newOffset = $it->offset + $it->uncommittedOffset; + $request = new Kafka_FetchRequest($this->topic, $it->partition, $newOffset, $this->maxBatchSize); + $it->messages = $it->consumer->fetch($request); + $it->messages->rewind(); + } + if ($it->messages->valid()) { + $this->hasMore = true; + return true; + } + // we're done with the current broker/partition, count how much we've read so far and update the offsets + $this->readBytes += $it->messages->validBytes(); + $it->uncommittedOffset += $it->messages->validBytes(); + } catch (Kafka_Exception_EmptyQueue $e) { + // no new data from this broker/partition + } + // reset the MessageSet iterator and move to the next + $it->messages = null; + $it->consumer->close(); + ++$this->idx; + if ($this->idx === $this->nIterators) { + // if we looped through all brokers/partitions and we did get data + // from at least one of them, reset the iterator and do another loop + if ($this->hasMore) { + $this->hasMore = false; + $this->idx = 0; + } + } + } + return false; + } + + /** + * Return the number of bytes read so far + * + * @return integer + */ + public function getReadBytes() { + if (0 == $this->nIterators) { + return 0; + } + $it = $this->iterators[$this->idx]; + $readInCurrentPartition = isset($it->messages) ? $it->messages->validBytes() : 0; + return $this->readBytes + $readInCurrentPartition; + } + + /** + * Commit the kafka offsets for each broker/partition in ZooKeeper + * + * @return integer + */ + public function commitOffsets() { + foreach ($this->iterators as $it) { + $readBytes = $it->uncommittedOffset; + if (null !== $it->messages) { + $readBytes += $it->messages->validBytes(); + } + if ($readBytes > 0) { + $this->offsetRegistry->commit($this->topic, $it->broker, $it->partition, $it->offset + $readBytes); + $it->uncommittedOffset = 0; + $it->offset += $readBytes; + } + } + } + + /** + * Resync invalid offsets to the first valid position + * + * @return integer Number of partitions/broker resync'ed + */ + public function resyncOffsets() { + $nReset = 0; + foreach ($this->iterators as $it) { + $consumer = new Kafka_SimpleConsumer($it->host, $it->port, $this->socketTimeout, $this->maxBatchSize); + try { + $newOffset = $it->offset + $it->uncommittedOffset; + $request = new Kafka_FetchRequest($this->topic, $it->partition, $newOffset, $this->maxBatchSize); + $it->messages = $it->consumer->fetch($request); + } catch (Kafka_Exception_OffsetOutOfRange $e) { + $offsets = $consumer->getOffsetsBefore($this->topic, $it->partition, Kafka_SimpleConsumer::OFFSET_FIRST, 1); + if (count($offsets) > 0) { + $newOffset = $offsets[0]; + $this->offsetRegistry->commit($this->topic, $it->broker, $it->partition, $newOffset); + $it->uncommittedOffset = 0; + $it->offset = $newOffset; + ++$nReset; + } + } + } + return $nReset; + } + + /** + * Get an approximate measure of the amount of data still to be consumed + * + * @return integer + */ + public function getRemainingSize() { + if (0 == $this->nIterators) { + $this->rewind(); // initialise simple consumers + } + $totalSize = 0; + foreach ($this->iterators as $it) { + $readBytes = $it->uncommittedOffset; + if (null !== $it->messages) { + $readBytes += $it->messages->validBytes(); + } + $consumer = new Kafka_SimpleConsumer($it->host, $it->port, $this->socketTimeout, $this->maxBatchSize); + $offsets = $consumer->getOffsetsBefore($this->topic, $it->partition, Kafka_SimpleConsumer::OFFSET_LAST, 1); + if (count($offsets) > 0) { + $remaining = $offsets[0] - $readBytes; // remaining bytes for this broker/partition + if ($remaining > 0) { + $totalSize += $remaining; + } + } + $consumer->close(); + } + return $totalSize; + } + + /** + * Rewind the iterator + * + * @return void + */ + public function rewind() { + $this->iterators = array(); + foreach ($this->topicRegistry->partitions($this->topic) as $broker => $nPartitions) { + for ($partition = 0; $partition < $nPartitions; ++$partition) { + list($host, $port) = explode(':', $this->brokerRegistry->address($broker)); + $offset = $this->offsetRegistry->offset($this->topic, $broker, $partition); + $this->iterators[] = (object) array( + 'consumer' => null, + 'host' => $host, + 'port' => $port, + 'broker' => $broker, + 'partition' => $partition, + 'offset' => $offset, + 'uncommittedOffset' => 0, + 'messages' => null, + ); + ++$this->nIterators; + } + } + if (0 == count($this->iterators)) { + throw new Kafka_Exception_InvalidTopic('Cannot find topic ' . $this->topic); + } + // get a random broker/partition every time + $this->shuffle(); + } +} Index: clients/php/src/lib/Kafka/Encoder.php =================================================================== --- clients/php/src/lib/Kafka/Encoder.php (revision 1385060) +++ clients/php/src/lib/Kafka/Encoder.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -25,44 +25,111 @@ /** * 1 byte "magic" identifier to allow format changes * - * @var integer + * @const integer */ const CURRENT_MAGIC_VALUE = 1; + + const COMPRESSION_NONE = 0; + const COMPRESSION_GZIP = 1; + const COMPRESSION_SNAPPY = 2; /** * Encode a message. The format of an N byte message is the following: - * - 1 byte: "magic" identifier to allow format changes - * - 1 byte: "compression-attributes" for compression alogrithm - * - 4 bytes: CRC32 of the payload - * - (N - 5) bytes: payload + * - 1 byte: "magic" identifier to allow format changes + * - 1 byte: "compression-attributes" for compression alogrithm + * - 4 bytes: CRC32 of the payload + * - (N - 6) bytes: payload * * @param string $msg Message to encode * * @return string + * @throws Kafka_Exception */ - static public function encode_message($msg, $compression) { + static public function encode_message($msg, $compression = self::COMPRESSION_NONE) { + $compressed = self::compress($msg, $compression); // - return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($msg)) - . $msg; + return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($compressed)) + . $compressed; + } + + /** + * Compress a message + * + * @param string $msg Message + * @param integer $compression 0=none, 1=gzip, 2=snappy + * + * @return string + * @throws Kafka_Exception + */ + static public function compress($msg, $compression) { + switch ($compression) { + case self::COMPRESSION_NONE: + return $msg; + case self::COMPRESSION_GZIP: + return gzencode($msg); + case self::COMPRESSION_SNAPPY: + throw new Kafka_Exception_NotSupported('SNAPPY compression not yet implemented'); + default: + throw new Kafka_Exception_NotSupported('Unknown compression flag: ' . $compression); + } + } + + /** + * Decompress a message + * + * @param string $msg Message + * @param integer $compression 0=none, 1=gzip, 2=snappy + * + * @return string + * @throws Kafka_Exception + */ + static public function decompress($msg, $compression) { + switch ($compression) { + case self::COMPRESSION_NONE: + return $msg; + case self::COMPRESSION_GZIP: + // NB: this is really a MessageSet, not just a single message + // although I'm not sure this is the best way to handle the inner offsets, + // as the symmetry with the outer collection iteration is broken. + // @see https://issues.apache.org/jira/browse/KAFKA-406 + $stream = fopen('php://temp', 'w+b'); + fwrite($stream, gzinflate(substr($msg, 10))); + rewind($stream); + $socket = Kafka_Socket::createFromStream($stream); + return new Kafka_MessageSetInternalIterator($socket, 0, 0); + case self::COMPRESSION_SNAPPY: + throw new Kafka_Exception_NotSupported('SNAPPY decompression not yet implemented'); + default: + throw new Kafka_Exception_NotSupported('Unknown compression flag: ' . $compression); + } } /** * Encode a complete request * - * @param string $topic Topic - * @param integer $partition Partition number - * @param array $messages Array of messages to send - * @param compression $compression flag for type of compression + * @param string $topic Topic + * @param integer $partition Partition number + * @param array $messages Array of messages to send + * @param integer $compression flag for type of compression * * @return string + * @throws Kafka_Exception */ - static public function encode_produce_request($topic, $partition, array $messages, $compression) { - // encode messages as + static public function encode_produce_request($topic, $partition, array $messages, $compression = self::COMPRESSION_NONE) { + // not sure I agree this is the best design for compressed messages + // @see https://issues.apache.org/jira/browse/KAFKA-406 + $compress = ($compression !== self::COMPRESSION_NONE); $message_set = ''; foreach ($messages as $message) { - $encoded = self::encode_message($message, $compression); + $encoded = self::encode_message($message, self::COMPRESSION_NONE); + // encode messages as $message_set .= pack('N', strlen($encoded)) . $encoded; } + if ($compress) { + $encoded = self::encode_message($message_set, $compression); + $message_set = pack('N', strlen($encoded)) . $encoded; + } + // create the request as $data = pack('n', PRODUCE_REQUEST_ID) . pack('n', strlen($topic)) . $topic . Index: clients/php/src/lib/Kafka/Registry/Topic.php =================================================================== --- clients/php/src/lib/Kafka/Registry/Topic.php (revision 0) +++ clients/php/src/lib/Kafka/Registry/Topic.php (revision 0) @@ -0,0 +1,100 @@ + + * @copyright 2012 Nick Telford + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * A Registry for Kafka brokers and the partitions they manage. + * + * The primary use of this is a facade API on top of ZooKeeper, providing a + * more friendly interface to some common operations. + * + * @category Libraries + * @package Kafka + * @author Nick Telford + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Registry_Topic +{ + const TOPIC_PATH = "/brokers/topics/%s"; + const BROKER_PATH = "/brokers/topics/%s/%d"; + + /** + * @var Zookeeper + */ + private $zookeeper; + + /** + * Create a new Topic Reigstry. + * + * @param Zookeeper $zookeeper the Zookeeper instance to back this TopicRegistry with. + */ + public function __construct(Zookeeper $zookeeper) + { + $this->zookeeper = $zookeeper; + } + + /** + * Get the partitions on a particular broker for a specific topic. + * + * @param string $topic the topic to get the partitions of + * @param int $broker the broker to get the partitions from + * + * @return int the number of the topics partitions on the broker + */ + public function partitionsForBroker($topic, $broker) + { + $data = sprintf(self::BROKER_PATH, $topic, (int) $broker); + $result = $this->zookeeper->get($data); + return empty($result) ? 0 : (int) $result; + } + + /** + * Get the partitions for a particular topic, grouped by broker. + * + * @param string $topic the topic to get the partitions of + * + * @return array the partitions as a map of broker to number of partitions (int -> int) + */ + public function partitions($topic) + { + $results = array(); + foreach ($this->brokers($topic) as $broker) { + $results[$broker] = $this->partitionsForBroker($topic, $broker); + } + return $results; + } + + /** + * Get the currently active brokers participating in a particular topic. + * + * @param string $topic the topic to get the brokers for + * + * @return array an array of brokers (int) that are participating in the topic + */ + public function brokers($topic) + { + $topicPath = sprintf(self::TOPIC_PATH, $topic); + if (!$this->zookeeper->exists($topicPath)) { + return array(); + } + $children = $this->zookeeper->getChildren($topicPath); + if (empty($children)) { + return array(); + } + + $results = array(); + foreach ($children as $child) { + $results[] = intval(str_replace($topicPath, '', $child)); + } + return $results; + } +} Index: clients/php/src/lib/Kafka/Registry/Broker.php =================================================================== --- clients/php/src/lib/Kafka/Registry/Broker.php (revision 0) +++ clients/php/src/lib/Kafka/Registry/Broker.php (revision 0) @@ -0,0 +1,65 @@ + + * @copyright 2012 Nick Telford + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * A Registry for Kafka brokers and the partitions they manage. + * + * The primary use of this is a facade API on top of ZooKeeper, providing a + * more friendly interface to some common operations. + * + * @category Libraries + * @package Kafka + * @author Nick Telford + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Registry_Broker +{ + const BROKER_PATH = '/brokers/ids/%d'; + + /** + * Zookeeper client + * + * @var Zookeeper + */ + private $zookeeper; + + /** + * Create a Broker Registry instance backed by the given Zookeeper quorum. + * + * @param Zookeeper a client for contacting the backing Zookeeper quorum + */ + public function __construct(Zookeeper $zookeeper) { + $this->zookeeper = $zookeeper; + } + + /** + * Get the hostname and port of a broker. + * + * @param int the id of the brother to get the address of + * + * @return string the hostname and port of the broker, separated by a colon: host:port + */ + public function address($broker) { + $data = sprintf(self::BROKER_PATH, (int) $broker); + $result = $this->zookeeper->get($data); + + if (empty($result)) { + $result = null; + } else { + $parts = explode(":", $result); + $result = $parts[1] . ':' . $parts[2]; + } + + return $result; + } +} Index: clients/php/src/lib/Kafka/Registry/Offset.php =================================================================== --- clients/php/src/lib/Kafka/Registry/Offset.php (revision 0) +++ clients/php/src/lib/Kafka/Registry/Offset.php (revision 0) @@ -0,0 +1,167 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ + +/** + * A Registry for Kafka Consumer offsets. + * + * The primary use of this is a facade API on top of ZooKeeper, providing a + * more friendly interface to some common operations. + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Registry_Offset +{ + const OFFSETS_PATH = '/consumers/%s/offsets/%s'; + const OFFSET_PATH = '/consumers/%s/offsets/%s/%d-%d'; + + /** + * @var Zookeeper + */ + private $zookeeper; + + /** + * @var string + */ + private $group; + + /** + * Create a new Offset Registry for the given group. + * + * @param Zookeeper $zookeeper a Zookeeper instance to back this OffsetRegistry + * @param string $group the consumer group to create this OffsetRegistry for + */ + public function __construct(Zookeeper $zookeeper, $group) { + $this->zookeeper = $zookeeper; + $this->group = (string) $group; + } + + /** + * Commits the given offset for the given partition + * + * @param string $topic the topic the partition belongs to + * @param int $broker the broker holding the partition + * @param int $partition the partition on the broker + * @param int $offset the offset to commit + */ + public function commit($topic, $broker, $partition, $offset) { + $path = sprintf(self::OFFSET_PATH, $this->group, $topic, (int) $broker, (int) $partition); + if (!$this->zookeeper->exists($path)) { + $this->makeZkPath($path); + $this->makeZkNode($path, (int)$offset); + } else { + $this->zookeeper->set($path, (int) $offset); + } + } + + /** + * Equivalent of "mkdir -p" on ZooKeeper + * + * @param string $path The path to the node + * @param mixed $value The value to assign to each new node along the path + * + * @return bool + */ + protected function makeZkPath($path, $value = 0) { + $parts = explode('/', $path); + $parts = array_filter($parts); + $subpath = ''; + while (count($parts) > 1) { + $subpath .= '/' . array_shift($parts); + if (!$this->zookeeper->exists($subpath)) { + $this->makeZkNode($subpath, $value); + } + } + } + + /** + * Create a node on ZooKeeper at the given path + * + * @param string $path The path to the node + * @param mixed $value The value to assign to the new node + * + * @return bool + */ + protected function makeZkNode($path, $value) { + $params = array( + array( + 'perms' => Zookeeper::PERM_ALL, + 'scheme' => 'world', + 'id' => 'anyone', + ) + ); + return $this->zookeeper->create($path, $value, $params); + } + + /** + * Get the current offset for the specified partition of a topic on a broker. + * + * @param string $topic the topic the partition belongs to + * @param int $broker the broker holding the partition + * @param int $partition the partition on the broker + * + * @return int the byte offset for the cursor in the partition + */ + public function offset($topic, $broker, $partition) { + $path = sprintf(self::OFFSET_PATH, $this->group, $topic, (int) $broker, (int) $partition); + if (!$this->zookeeper->exists($path)) { + return 0; + } + + $result = $this->zookeeper->get($path); + return empty($result) ? 0 : $result; + } + + /** + * Gets the current offsets for all partitions of a topic. + * + * @param string $topic the topic to get the offsets for + * + * @return array a map of partition (broker + partition ID) to the byte offset offset (int). + */ + public function offsets($topic) { //: Map[(Int, Int), Long] == Map[(Broker, Partition), Offset] + $offsets = array(); + foreach ($this->partitions($topic) as $broker => $partition) { + if (!isset($offsets[$broker])) { + $offsets[$broker] = array(); + } + + $offsets[$broker][$partition] = $this->offset($topic, $broker, $partition); + } + return $offsets; + } + + /** + * Gets all the partitions for a given topic. + * + * @param string $topic the topic to get the partitions for + * + * @return array an associative array of the broker (int) to the number of partitions (int) + */ + public function partitions($topic) { + $offsetsPath = sprintf(self::OFFSETS_PATH, $this->group, $topic); + if (!$this->zookeeper->exists($offsetsPath)) { + return array(); + } + + $children = $this->zookeeper->getChildren($offsetsPath); + $partitions = array(); + foreach ($children as $child) { + list($broker, $partition) = explode('-', str_replace($offsetsPath, '', $child), 2); + $partitions[intval($broker)] = intval($partition); + } + return $partitions; + } +} Index: clients/php/src/lib/Kafka/Producer.php =================================================================== --- clients/php/src/lib/Kafka/Producer.php (revision 1385060) +++ clients/php/src/lib/Kafka/Producer.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -28,9 +28,9 @@ protected $request_key; /** - * @var resource + * @var Kafka_Socket */ - protected $conn; + protected $socket; /** * @var string @@ -43,7 +43,9 @@ protected $port; /** - * @var integer + * Compression: 0=none; 1=gzip; 2=snappy + * + * @var integer */ protected $compression; @@ -53,26 +55,24 @@ * @param integer $host Host * @param integer $port Port */ - public function __construct($host, $port) { - $this->request_key = 0; - $this->host = $host; - $this->port = $port; - $this->compression = 0; + public function __construct($host, $port, $compression = Kafka_Encoder::COMPRESSION_GZIP) { + $this->request_key = Kafka_RequestKeys::PRODUCE; + $this->host = $host; + $this->port = $port; + $this->compression = $compression; } /** * Connect to Kafka via a socket * * @return void - * @throws RuntimeException + * @throws Kafka_Exception */ public function connect() { - if (!is_resource($this->conn)) { - $this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr); - } - if (!is_resource($this->conn)) { - throw new RuntimeException('Cannot connect to Kafka: ' . $errstr, $errno); + if (null === $this->socket) { + $this->socket = new Kafka_Socket($this->host, $this->port); } + $this->socket->connect(); } /** @@ -81,8 +81,8 @@ * @return void */ public function close() { - if (is_resource($this->conn)) { - fclose($this->conn); + if (null !== $this->socket) { + $this->socket->close(); } } @@ -97,7 +97,7 @@ */ public function send(array $messages, $topic, $partition = 0xFFFFFFFF) { $this->connect(); - return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression)); + return $this->socket->write(Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression)); } /** @@ -108,7 +108,7 @@ */ public function __sleep() { $this->close(); - return array('request_key', 'host', 'port'); + return array('request_key', 'host', 'port', 'compression'); } /** Index: clients/php/src/lib/Kafka/RequestKeys.php =================================================================== --- clients/php/src/lib/Kafka/RequestKeys.php (revision 1385060) +++ clients/php/src/lib/Kafka/RequestKeys.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ Index: clients/php/src/lib/Kafka/Message.php =================================================================== --- clients/php/src/lib/Kafka/Message.php (revision 1385060) +++ clients/php/src/lib/Kafka/Message.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -14,9 +14,9 @@ /** * A message. The format of an N byte message is the following: * 1 byte "magic" identifier to allow format changes - * 1 byte compression-attribute + * 1 byte compression-attribute (missing if magic=0) * 4 byte CRC32 of the payload - * N - 5 byte payload + * N - 6 byte payload (N-5 if magic=0) * * @category Libraries * @package Kafka @@ -26,7 +26,12 @@ */ class Kafka_Message { - + /** + * Wire format (0=without compression attribute, 1=with) + * @var integer + */ + private $magic = Kafka_Encoder::CURRENT_MAGIC_VALUE; + /** * @var string */ @@ -35,17 +40,17 @@ /** * @var integer */ - private $size = 0; + private $size = 0; /** * @var integer */ - private $compression = 0; + private $compression = Kafka_Encoder::COMPRESSION_NONE; /** * @var string */ - private $crc = false; + private $crc = false; /** * Constructor @@ -53,10 +58,26 @@ * @param string $data Message payload */ public function __construct($data) { - $this->payload = substr($data, 6); - $this->compression = substr($data,1,1); - $this->crc = crc32($this->payload); - $this->size = strlen($this->payload); + $this->magic = array_shift(unpack('C', substr($data, 0, 1))); + if ($this->magic == 0) { + $this->crc = array_shift(unpack('N', substr($data, 1, 4))); + $this->payload = substr($data, 5); + } else { + $this->compression = array_shift(unpack('C', substr($data, 1, 1))); + $this->crc = array_shift(unpack('N', substr($data, 2, 4))); + $this->payload = substr($data, 6); + } + $this->size = strlen($this->payload); + } + + + /** + * Return the compression flag + * + * @return integer + */ + public function compression() { + return $this->compression; } @@ -84,7 +105,7 @@ * @return integer */ public function magic() { - return Kafka_Encoder::CURRENT_MAGIC_VALUE; + return $this->magic; } /** @@ -99,10 +120,10 @@ /** * Get the message payload * - * @return string + * @return string|Kafka_MessageSetInternalIterator */ public function payload() { - return $this->payload; + return Kafka_Encoder::decompress($this->payload, $this->compression); } /** @@ -120,7 +141,21 @@ * @return string */ public function __toString() { - return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', compression = ' . $this->compression . - ', crc = ' . $this->crc . ', payload = ' . $this->payload . ')'; + try { + $payload = $this->payload(); + } catch (Exception $e) { + $payload = 'ERROR decoding payload: ' . $e->getMessage(); + } + if (!is_string($payload)) { + $payload = 'COMPRESSED-CONTENT'; + } + return 'message(' + . 'magic = ' . $this->magic + . ', compression = ' . $this->compression + . ', size = ' . $this->size() + . ', crc = ' . $this->crc + . ', valid = ' . ($this->isValid() ? 'true' : 'false') + . ', payload = ' . $payload + . ')'; } } Index: clients/php/src/lib/Kafka/Socket.php =================================================================== --- clients/php/src/lib/Kafka/Socket.php (revision 0) +++ clients/php/src/lib/Kafka/Socket.php (revision 0) @@ -0,0 +1,259 @@ + + * @copyright 2012 Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @version $Revision: $ + * @link http://sna-projects.com/kafka/ + */ + +/** + * Class to read/write to a socket + * + * @category Libraries + * @package Kafka + * @author Lorenzo Alberton + * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 + * @link http://sna-projects.com/kafka/ + */ +class Kafka_Socket +{ + /** + * Send timeout in seconds. + * + * Combined with sendTimeoutUsec this is used for send timeouts. + * + * @var int + */ + private $sendTimeoutSec = 0; + + /** + * Send timeout in microseconds. + * + * Combined with sendTimeoutSec this is used for send timeouts. + * + * @var int + */ + private $sendTimeoutUsec = 100000; + + /** + * Recv timeout in seconds + * + * Combined with recvTimeoutUsec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutSec = 0; + + /** + * Recv timeout in microseconds + * + * Combined with recvTimeoutSec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutUsec = 750000; + + /** + * Stream resource + * + * @var resource + */ + private $stream = null; + + /** + * Socket host + * + * @var string + */ + private $host = null; + + /** + * Socket port + * + * @var integer + */ + private $port = -1; + + /** + * Constructor + * + * @param string $host Host + * @param integer $port Port + * @param integer $recvTimeoutSec Recv timeout in seconds + * @param integer $recvTimeoutUsec Recv timeout in microseconds + * @param integer $sendTimeoutSec Send timeout in seconds + * @param integer $sendTimeoutUsec Send timeout in microseconds + */ + public function __construct($host, $port, $recvTimeoutSec = 0, $recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000) { + $this->host = $host; + $this->port = (int)$port; + $this->recvTimeoutSec = $recvTimeoutSec; + $this->recvTimeoutUsec = $recvTimeoutUsec; + $this->sendTimeoutSec = $sendTimeoutSec; + $this->sendTimeoutUsec = $sendTimeoutUsec; + } + + /** + * Optional method to set the internal stream handle + * + * @param resource $stream File handle + * + * @return void + */ + static public function createFromStream($stream) { + $socket = new self('localhost', 0); + $socket->setStream($stream); + return $socket; + } + + /** + * Optional method to set the internal stream handle + * + * @param resource $stream File handle + * + * @return void + */ + public function setStream($stream) { + $this->stream = $stream; + } + + /** + * Connects the socket + * + * @return void + * @throws Kafka_Exception_Socket_Connection + */ + public function connect() { + if (is_resource($this->stream)) { + return true; + } + + if (empty($this->host)) { + throw new Kafka_Exception_Socket_Connection('Cannot open null host'); + } + if ($this->port <= 0) { + throw new Kafka_Exception_Socket_Connection('Cannot open without port'); + } + + $this->stream = @fsockopen( + $this->host, + $this->port, + $errno, + $errstr, + $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000) + ); + @stream_set_blocking($this->stream, 0); + //socket_set_option($this->stream, SOL_TCP, TCP_NODELAY, 1); + //socket_set_option($this->stream, SOL_TCP, SO_KEEPALIVE, 1); + + // Connect failed? + if ($this->stream === FALSE) { + $error = 'Could not connect to '.$this->host.':'.$this->port.' ('.$errstr.' ['.$errno.'])'; + throw new Kafka_Exception_Socket_Connection($error); + } + } + + /** + * Closes the socket + * + * @return void + */ + public function close() { + if (is_resource($this->stream)) { + fclose($this->stream); + } + } + + /** + * Read from the socket at most $len bytes. + * + * This method will not wait for all the requested data, it will return as + * soon as any data is received. + * + * @param integer $len Maximum number of bytes to read. + * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len + * + * @return string Binary data + * @throws Kafka_Exception_Socket + */ + public function read($len, $verifyExactLength = false) { + $null = null; + $read = array($this->stream); + $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec); + if ($readable > 0) { + $remainingBytes = $len; + $data = $chunk = ''; + while ($remainingBytes > 0) { + $chunk = fread($this->stream, $remainingBytes); + if ($chunk === false) { + throw new Kafka_Exception_Socket_EOF('Could not read '.$len.' bytes from stream (no data)'); + } + if ($chunk == '' && feof($this->stream)) { + break; // unexpected EOF + } + $data .= $chunk; + $remainingBytes -= strlen($chunk); + } + if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) { + // couldn't read anything at all OR reached EOF sooner than expected + throw new Kafka_Exception_Socket_EOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes'); + } + + return $data; + } + if (false !== $readable) { + $res = stream_get_meta_data($this->stream); + if (!empty($res['timed_out'])) { + throw new Kafka_Exception_Socket_Timeout('Timed out reading '.$len.' bytes from stream'); + } + } + throw new Kafka_Exception_Socket_EOF('Could not read '.$len.' bytes from stream (not readable)'); + } + + /** + * Write to the socket. + * + * @param string $buf The data to write + * + * @return integer + * @throws Kafka_Exception_Socket + */ + public function write($buf) { + $null = null; + $write = array($this->stream); + + // wait for stream to become available for writing + $writable = @stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec); + if ($writable > 0) { + // write buffer to stream + $written = fwrite($this->stream, $buf); + if ($written === -1 || $written === false) { + throw new Kafka_Exception_Socket('Could not write ' . strlen($buf) . ' bytes to stream'); + } + return $written; + } + if (false !== $writable) { + $res = stream_get_meta_data($this->stream); + if (!empty($res['timed_out'])) { + throw new Kafka_Exception_Socket_Timeout('Timed out writing ' . strlen($buf) . ' bytes to stream'); + } + } + throw new Kafka_Exception_Socket('Could not write ' . strlen($buf) . ' bytes to stream'); + } + + /** + * Rewind the stream + * + * @return void + */ + public function rewind() { + if (is_resource($this->stream)) { + rewind($this->stream); + } + } +} Index: clients/php/src/lib/Kafka/Request.php =================================================================== --- clients/php/src/lib/Kafka/Request.php (revision 1385060) +++ clients/php/src/lib/Kafka/Request.php (working copy) @@ -5,7 +5,7 @@ * @category Libraries * @package Kafka * @author Lorenzo Alberton - * @copyright 2011 Lorenzo Alberton + * @copyright 2012 Lorenzo Alberton * @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0 * @version $Revision: $ * @link http://sna-projects.com/kafka/ @@ -26,5 +26,78 @@ * @var integer */ public $id; -} + /** + * @var string + */ + protected $topic; + + /** + * @var integer + */ + protected $partition; + + /** + * Write the request to the output stream + * + * @param Kafka_Socket $socket Output stream + * + * @return void + */ + abstract public function writeTo(Kafka_Socket $socket); + + /** + * Get request size in bytes + * + * @return integer + */ + abstract public function sizeInBytes(); + + /** + * Write the Request Header + * + + + + + * + * @param Kafka_Socket $socket Socket + * + * @return void + */ + protected function writeRequestHeader(Kafka_Socket $socket) { + // REQUEST_LENGTH (int) + REQUEST_TYPE (short) + $socket->write(pack('N', $this->sizeInBytes() + 2)); + $socket->write(pack('n', $this->id)); + + // TOPIC_SIZE (short) + TOPIC (bytes) + $socket->write(pack('n', strlen($this->topic)) . $this->topic); + // PARTITION (int) + $socket->write(pack('N', $this->partition)); + } + + /** + * Pack a 64bit integer as big endian long + * + * @param integer $big Big int + * + * @return bytes + */ + static public function packLong64bigendian($big) { + $left = 0xffffffff00000000; + $right = 0x00000000ffffffff; + + $l = ($big & $left) >> 32; + $r = $big & $right; + + return pack('NN', $l, $r); + } + + /** + * Pack a 64bit integer as big endian long + * + * @param integer $big Big int + * + * @return integer + */ + static public function unpackLong64bigendian($bytes) { + $set = unpack('N2', $bytes); + return $original = $set[1] << 32 | $set[2]; + } +} Index: clients/php/src/examples/consume.php =================================================================== --- clients/php/src/examples/consume.php (revision 1385060) +++ clients/php/src/examples/consume.php (working copy) @@ -1,4 +1,5 @@ #!/usr/bin/php +fetch($fetchRequest); foreach ($messages as $msg) { - echo "\nconsumed[$offset]: " . $msg->payload(); + echo "\nconsumed[$offset][$partialOffset]: " . $msg->payload(); + $partialOffset = $messages->validBytes(); } //advance the offset after consuming each message $offset += $messages->validBytes(); Index: clients/php/src/examples/autoloader.php =================================================================== --- clients/php/src/examples/autoloader.php (revision 1385060) +++ clients/php/src/examples/autoloader.php (working copy) @@ -1,3 +1,4 @@ +getReadBytes() >= $maxBatchSize) { + break; + } + } +} catch (Kafka_Exception_OffsetOutOfRange $exception) { + // if we haven't received any messages, resync the offsets for the next time, then bomb out + if ($zkconsumer->getReadBytes() == 0) { + $zkconsumer->resyncOffsets(); + die($exception->getMessage()); + } + // if we did receive some messages before the exception, carry on. +} catch (Kafka_Exception_Socket_Connection $exception) { + // deal with it below +} catch (Kafka_Exception $exception) { + // deal with it below +} + +if (null !== $exception) { + // if we haven't received any messages, bomb out + if ($zkconsumer->getReadBytes() == 0) { + die($exception->getMessage()); + } + // otherwise log the error, commit the offsets for the messages read so far and return the data +} + +// process the data in batches, wait for ACK + +$success = doSomethingWithTheMessages($messages); + +// Once the data is processed successfully, commit the byte offsets. +if ($success) { + $zkconsumer->commitOffsets(); +} + +// get an approximate figure on the size of the queue +try { + echo "\nRemaining bytes in queue: " . $consumer->getRemainingSize(); +} catch (Kafka_Exception_Socket_Connection $exception) { + die($exception->getMessage()); +} catch (Kafka_Exception $exception) { + die($exception->getMessage()); +} + Index: clients/php/src/examples/produce.php =================================================================== --- clients/php/src/examples/produce.php (revision 1385060) +++ clients/php/src/examples/produce.php (working copy) @@ -1,4 +1,5 @@ #!/usr/bin/php +