diff --git a/README.md b/README.md index ec80083..e3fea22 100644 --- a/README.md +++ b/README.md @@ -1,78 +1,80 @@ -Apache Kafka -================= +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Apache Kafka # See our [web site](http://kafka.apache.org) for details on the project. -### Building a jar and running it ### - ./gradlew jar +## Building a jar and running it ## +1. ./gradlew jar +2. Follow instuctions in http://kafka.apache.org/documentation.html#quickstart -Follow instuctions in http://kafka.apache.org/documentation.html#quickstart +## Running unit tests ## +./gradlew test -### Running unit tests ### - ./gradlew test +## Forcing re-running unit tests w/o code change ## +./gradlew cleanTest test -### Forcing re-running unit tests w/o code change ### - ./gradlew cleanTest test - -### Running a particular unit test ### - ./gradlew -Dtest.single=RequestResponseSerializationTest core:test - -### Building a binary release gzipped tar ball ### - ./gradlew clean - ./gradlew releaseTarGz +## Running a particular unit test ## +./gradlew -Dtest.single=RequestResponseSerializationTest core:test +## Building a binary release gzipped tar ball ## +./gradlew clean +./gradlew releaseTarGz The release file can be found inside ./core/build/distributions/. -### Cleaning the build ### - ./gradlew clean - -### Running a task on a particular version of Scala #### -either 2.8.0, 2.8.2, 2.9.1, 2.9.2 or 2.10.1) (If building a jar with a version other than 2.8.0, the scala version variable in bin/kafka-run-class.sh needs to be changed to run quick start.) - ./gradlew -PscalaVersion=2.9.1 jar - ./gradlew -PscalaVersion=2.9.1 test - ./gradlew -PscalaVersion=2.9.1 releaseTarGz - -### Running a task for a specific project ### -This is for 'core', 'perf', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples' and 'clients' - ./gradlew core:jar - ./gradlew core:test +## Cleaning the build ## +./gradlew clean -### Listing all gradle tasks ### - ./gradlew tasks +## Running a task on a particular version of Scala (either 2.8.0, 2.8.2, 2.9.1, 2.9.2 or 2.10.1) ## +## (If building a jar with a version other than 2.8.0, the scala version variable in bin/kafka-run-class.sh needs to be changed to run quick start.) ## +./gradlew -PscalaVersion=2.9.1 jar +./gradlew -PscalaVersion=2.9.1 test +./gradlew -PscalaVersion=2.9.1 releaseTarGz -### Building IDE project #### - ./gradlew eclipse - ./gradlew idea +## Running a task for a specific project in 'core', 'perf', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients' ## +./gradlew core:jar +./gradlew core:test -### Building the jar for all scala versions and for all projects ### - ./gradlew jarAll +## Listing all gradle tasks ## +./gradlew tasks -### Running unit tests for all scala versions and for all projects ### - ./gradlew testAll +# Building IDE project ## +./gradlew eclipse +./gradlew idea -### Building a binary release gzipped tar ball for all scala versions ### - ./gradlew releaseTarGzAll +# Building the jar for all scala versions and for all projects ## +./gradlew jarAll -### Publishing the jar for all version of Scala and for all projects to maven ### - ./gradlew uploadArchivesAll +## Running unit tests for all scala versions and for all projects ## +./gradlew testAll -Please note for this to work you need to create/update `~/.gradle/gradle.properties` and assign the following variables +## Building a binary release gzipped tar ball for all scala versions ## +./gradlew releaseTarGzAll - mavenUrl= - mavenUsername= - mavenPassword= - signing.keyId= - signing.password= - signing.secretKeyRingFile= +## Publishing the jar for all version of Scala and for all projects to maven (To test locally, change mavenUrl in gradle.properties to a local dir.) ## +./gradlew uploadArchivesAll -### Building the test jar ### - ./gradlew testJar +## Building the test jar ## +./gradlew testJar -### Determining how transitive dependencies are added ### - ./gradlew core:dependencies --configuration runtime +## Determining how transitive dependencies are added ## +./gradlew core:dependencies --configuration runtime -### Contribution ### +## Contribution ## -Apache Kafka interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html). +Kafka is a new project, and we are interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html). To contribute follow the instructions here: * http://kafka.apache.org/contributing.html diff --git a/build.gradle b/build.gradle index 58a6396..9489914 100644 --- a/build.gradle +++ b/build.gradle @@ -34,31 +34,25 @@ subprojects { apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'maven' - apply plugin: 'signing' - + uploadArchives { repositories { - signing { - sign configurations.archives - - // To test locally, replace mavenUrl in ~/.gradle/gradle.properties to file://localhost/tmp/myRepo/ - mavenDeployer { - beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) } - repository(url: "${mavenUrl}") { - authentication(userName: "${mavenUsername}", password: "${mavenPassword}") - } - afterEvaluate { - pom.artifactId = "${archivesBaseName}" - pom.project { - name 'Apache Kafka' - packaging 'jar' - url 'http://kafka.apache.org' - licenses { - license { - name 'The Apache Software License, Version 2.0' - url 'http://www.apache.org/licenses/LICENSE-2.0.txt' - distribution 'repo' - } + // To test locally, replace mavenUrl in gradle.properties to file://localhost/tmp/myRepo/ + mavenDeployer { + repository(url: "${mavenUrl}") { + authentication(userName: "${mavenUsername}", password: "${mavenPassword}") + } + afterEvaluate { + pom.artifactId = "${archivesBaseName}" + pom.project { + name 'Apache Kafka' + packaging 'jar' + url 'http://kafka.apache.org' + licenses { + license { + name 'The Apache Software License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + distribution 'repo' } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3d180e8..20f9549 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -215,9 +216,20 @@ public class KafkaProducer implements Producer { this.sender.wakeup(); return future; } catch (Exception e) { - if (callback != null) - callback.onCompletion(null, e); - return new FutureFailure(e); + // For API exceptions return them in the future; + // for other exceptions throw directly + if (e instanceof ApiException) { + System.out.println("Caught Exception-1 !!!!!!!"); + if (callback != null) + callback.onCompletion(null, e); + return new FutureFailure(e); + } else if (e instanceof KafkaException) { + System.out.println("Caught Exception-2 !!!!!!!"); + throw (KafkaException) e; + } else { + System.out.println("Caught Exception-3 !!!!!!!"); + throw new KafkaException(e); + } } } @@ -252,7 +264,6 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - this.accumulator.close(); this.sender.initiateClose(); try { this.ioThread.join(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index 8c77698..e752997 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -32,6 +32,12 @@ public final class RecordMetadata { this.topicPartition = topicPartition; } + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) { + // ignore the relativeOffset if the base offset is -1, + // since this indicates the offset is unknown + this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset); + } + /** * The offset of the record in the topic/partition. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 22d4c79..47b6673 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.producer.RecordMetadata; @@ -31,10 +32,12 @@ public final class FutureRecordMetadata implements Future { private final ProduceRequestResult result; private final long relativeOffset; + private AtomicReference value; public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) { this.result = result; this.relativeOffset = relativeOffset; + this.value = new AtomicReference(null); } @Override @@ -57,10 +60,12 @@ public final class FutureRecordMetadata implements Future { } private RecordMetadata valueOrError() throws ExecutionException { - if (this.result.error() != null) + if (this.result.error() != null) { throw new ExecutionException(this.result.error()); - else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset); + } else { + value.compareAndSet(null, new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset)); + return value.get(); + } } public long relativeOffset() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 52d30a8..de484ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -79,10 +79,10 @@ public final class Metadata { */ public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; + long begin = System.currentTimeMillis(); do { partitions = cluster.partitionsFor(topic); if (partitions == null) { - long begin = System.currentTimeMillis(); topics.add(topic); forceUpdate = true; try { @@ -90,7 +90,8 @@ public final class Metadata { } catch (InterruptedException e) { /* this is fine, just try again */ } long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed > maxWaitMs) + System.out.println("Loop " + ellapsed + " - " + maxWaitMs); + if (ellapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } else { return cluster; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 7a440a3..839e3bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -59,7 +59,7 @@ public final class RecordBatch { this.records.append(0L, key, value, compression); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) - thunks.add(new Thunk(callback, this.recordCount)); + thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } @@ -78,7 +78,8 @@ public final class RecordBatch { try { Thunk thunk = this.thunks.get(i); if (exception == null) - thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset), + // get() should not throw any exception here + thunk.callback.onCompletion(thunk.future.get(), null); else thunk.callback.onCompletion(null, exception); @@ -89,15 +90,15 @@ public final class RecordBatch { } /** - * A callback and the associated RecordSend argument to pass to it. + * A callback and the associated FutureRecordMetadata argument to pass to it. */ final private static class Thunk { final Callback callback; - final long relativeOffset; + final FutureRecordMetadata future; - public Thunk(Callback callback, long relativeOffset) { + public Thunk(Callback callback, FutureRecordMetadata future) { this.callback = callback; - this.relativeOffset = relativeOffset; + this.future = future; } } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d93a455..dc6280d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -106,6 +106,8 @@ public class Sender implements Runnable { // wait until these are completed. int unsent = 0; do { + System.out.println("unsent is " + unsent + ", inFlightRequest is " + this.inFlightRequests.totalInFlightRequests()); + try { unsent = run(time.milliseconds()); } catch (Exception e) { @@ -133,6 +135,7 @@ public class Sender implements Runnable { InFlightRequest metadataReq = maybeMetadataRequest(cluster, now); if (metadataReq != null) { sends.add(metadataReq.request); + System.out.println("Send metadata request !!!!!!!"); this.inFlightRequests.add(metadataReq); } @@ -315,6 +318,7 @@ public class Sender implements Runnable { } private void handleMetadataResponse(Struct body, long now) { + System.out.println("Receive metadata response !!!!!!!"); this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java index ce95ca0..94099f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.errors; -public class RecordTooLargeException extends ApiException { +public class RecordTooLargeException extends FatalException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f88992a..3374bd9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; public enum Errors { UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), NONE(0, null), - OFFSET_OUT_OF_RANGE(1, - new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), - CORRUPT_MESSAGE(2, - new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), + OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), + CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), - LEADER_NOT_AVAILABLE(5, - new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), + // TODO: errorCode 4 for InvalidFetchSize + LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), - MESSAGE_TOO_LARGE(10, - new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), + // TODO: errorCode 8, 9, 11 + MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..d8edd01 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -109,4 +109,7 @@ public class TestUtils { return b.toString(); } + /** + * Create a producer with the given specs + */ } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala new file mode 100644 index 0000000..6155945 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import org.junit.Assert._ + +import java.util.Properties +import java.lang.Integer +import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} + +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import kafka.consumer.SimpleConsumer + +import org.apache.kafka.common.KafkaException +import org.apache.kafka.clients.producer._ + +class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + props1.put("auto.create.topics.enable", "false") + props2.put("auto.create.topics.enable", "false") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)) + + private val bufferSize = 2 * config1.messageMaxBytes + + private val topic1 = "topic-1" + private val topic2 = "topic-2" + private val numRecords = 100 + + override def setUp() { + super.setUp() + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + } + + override def tearDown() { + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + consumer1.close + consumer2.close + super.tearDown() + } + + /** + * testErrorInResponse + * + * 1. With ack == 0 the future metadata will have no exceptions with offset -1 + * 2. With ack != 0 the future metadata will throw ExecutionException caused by RecordTooLargeException + * 3. With not-exist-topic the future metadata should return ExecutionException cased by TimeoutException + * 4. With incorrect broker-list the future metadata should return ExecutionException cased by TimeoutException + * + * TODO: exceptions that can throw in ExecutionException: + * UnknownTopicOrPartitionException + * NotLeaderForPartitionException + * LeaderNotAvailableException + * CorruptRecordException + * TimeoutException + */ + @Test + def testErrorInResponse() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // TODO: create a CreateProducer function in TestUtil after it has been migrated to clients + // TODO: expose producer configs after creating them + // produce with ack=0 + val producerProps1 = new Properties() + producerProps1.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps1.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "0") + producerProps1.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + val producer1 = new KafkaProducer(producerProps1) + + // produce with ack=1 + val producerProps2 = new Properties() + producerProps2.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps2.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "1") + producerProps2.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + val producer2 = new KafkaProducer(producerProps2) + + // produce with incorrect broker-list + val producerProps3 = new Properties() + producerProps3.put(ProducerConfig.BROKER_LIST_CONFIG, "localhost:8686,localhost:4242") + producerProps3.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + val producer3 = new KafkaProducer(producerProps3) + + try { + // send a too-large record + val record1 = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) + try { + assertEquals("Returned metadata should have offset -1", producer1.send(record1).get.offset, -1L) + } catch { + case e: Throwable => fail("Returned metadata should not contain errors", e) + } + + try { + producer2.send(record1).get + fail("This message response should throw an exception") + } catch { + case ee: ExecutionException => // this is OK + case e: Throwable => fail("Returned metadata only expect ExecutionException", e) + } + + // send a record with non-exist topic + val record2 = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) + try { + producer1.send(record2).get + fail("This message response should throw an exception") + } catch { + case ke: ExecutionException => // this is OK + case e: Throwable => { + fail("Only expect ExecutionException", e) + } + } + + // send a record with incorrect broker-list + try { + producer3.send(record1).get + fail("This message response should throw an exception") + } catch { + case ke: ExecutionException => // this is OK + case e: Throwable => fail("Only expect ExecutionException", e) + } + } finally { + producer1.close + + System.out.println("1 Finish!!!!!!!!!!!!!!!!!!!!!1") + + producer2.close + + System.out.println("2 Finish!!!!!!!!!!!!!!!!!!!!!1") + + producer3.close + + System.out.println("3 Finish!!!!!!!!!!!!!!!!!!!!!1") + } + } + + /** + * testNoResponse + * + * 1. With ack=0, the future metadata should not be blocked. + * 2. With ack=1, the future metadata should block, + * and subsequent calls will eventually cause buffer full + */ + @Test + def testNoResponse() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + val producerProps1 = new Properties() + producerProps1.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps1.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "0") + val producer1 = new KafkaProducer(producerProps1) + + val producerProps2 = new Properties() + producerProps2.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps2.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "1") + val producer2 = new KafkaProducer(producerProps1) + + try { + // first send a message to make sure the metadata is refreshed + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + + try { + producer1.send(record).get + producer2.send(record).get + } catch { + case e: Throwable => fail("Should not throw any exceptions", e) + } + + // stop IO threads and request handling, but leave networking operational + // any requests should be accepted and queue up, but not handled + server1.requestHandlerPool.shutdown() + server2.requestHandlerPool.shutdown() + + try { + producer1.send(record).get(5000, TimeUnit.MILLISECONDS) + } catch { + case e: Throwable => fail("Should not throw any exceptions", e) + } + + try { + producer2.send(record).get(5000, TimeUnit.MILLISECONDS) + fail("TimeoutException should be thrown") + } catch { + case ee: TimeoutException => // this is OK + case e: Throwable => fail("Only expect TimeoutException", e) + } + + // send enough messages to get buffer full + val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length) + + try { + for (i <- 1 to tooManyRecords) + producer2.send(record) + fail("An exception should be thrown") + } catch { + case ke: KafkaException => assertTrue("Only expect BufferExhaustedException", ke.isInstanceOf[BufferExhaustedException]) + case e: Throwable => fail("Only expect BufferExhaustedException", e) + } + } finally { + producer1.close + // do not close produce2 since it will block + // TODO: can we do better? + } + } + + /** + * testSendException + * + * 1. The send call with invalid partition id should throw KafkaException cased by IllegalArgumentException + * 2. The send call with too large message will throw BufferExhaustedException + * 3. The send call after producer closed should throw KafkaException cased by IllegalStateException + */ + @Test + def testSendException() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + val producerProps = new Properties() + producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "0") + val producer = new KafkaProducer(producerProps) + + try { + // create a record with incorrect partition id, send should fail + val record1 = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes) + + try { + producer.send(record1) + fail("This message response should throw an exception") + } catch { + case ke: KafkaException => // this is OK + case e: Throwable => fail("Only expect KafkaException", e) + } + + System.out.println("Two Finish!!!!!!!!!!!!!!!!!!!!!1") + + // create a very large record which will exhaust the buffer, send should fail + val record2 = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](bufferSize)) + + try { + producer.send(record2) + fail("This message response should throw an exception") + } catch { + case ke: KafkaException => assertTrue("Only expect BufferExhaustedException", ke.isInstanceOf[BufferExhaustedException]) + case e: Throwable => fail("Only expect BufferExhaustedException", e) + } + + System.out.println("Three Finish!!!!!!!!!!!!!!!!!!!!!1") + + // close the producer + producer.close() + + System.out.println("CloseFive Finish!!!!!!!!!!!!!!!!!!!!!1") + + try { + producer.send(record1) + fail("Send after closed should throw an exception directly") + } catch { + case ke: KafkaException => { + val cause = ke.getCause + assertTrue("Only expecting IllegalStateException", cause != null) + assertTrue("Only expecting IllegalStateException", cause.isInstanceOf[IllegalStateException]) + } + case e: Throwable => fail("Only expecting KafkaException", e) + } + } + + + } +/* + /** + * testIncorrectBrokerList + * + * The send call should block + */ + // TODO: this test needs to be modified with retry + @Test + def testDeadBroker() { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + val leader = leaders.get(0) + assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined) + + val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val response1 = producer2.send(record1) + + // shutdown broker + val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2 + serverToShutdown.shutdown() + serverToShutdown.awaitShutdown() + + assertEquals("yah", response1.get.offset, 0L) + + // send to another topic to enforce metadata refresh + TestUtils.createTopic(zkClient, topic2, 1, 2, servers) + val record2 = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) + assertEquals("yah", producer2.send(record2).get.offset, 0L) + + // re-send to the first topic, this time it should succeed + assertEquals("yah", producer2.send(record1).get.offset, 0L) + } +*/ +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 34baa8c..9a8815e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -18,9 +18,8 @@ package kafka.test import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ZkUtils, Utils, TestUtils, Logging} +import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message @@ -33,7 +32,6 @@ import org.junit.Assert._ import java.util.Properties import java.lang.{Integer, IllegalArgumentException} -import org.apache.log4j.Logger class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { // send a normal record val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes) - val response0 = producer.send(record0, callback) - assertEquals("Should have offset 0", 0L, response0.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null) - val response1 = producer.send(record1, callback) - assertEquals("Should have offset 1", 1L, response1.get.offset) + assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes) - val response2 = producer.send(record2, callback) - assertEquals("Should have offset 2", 2L, response2.get.offset) + assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response3 = producer.send(record3, callback) - assertEquals("Should have offset 3", 3L, response3.get.offset) + assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) // send a record with null topic should fail try { val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) - val response4 = producer.send(record4, callback) - response4.wait + producer.send(record4, callback) + fail("Should not allow sending a record without topic") } catch { case iae: IllegalArgumentException => // this is ok case e: Throwable => fail("Only expecting IllegalArgumentException", e) @@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { producer.send(record0) // check that all messages have been acked via offset - val response5 = producer.send(record0, callback) - assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset) + assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) } finally { if (producer != null) { @@ -158,6 +151,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { * testClose checks the closing behavior * * 1. After close() returns, all messages should be sent with correct returned offset metadata + * 2. After close() returns, send() should throw an exception immediately */ @Test def testClose() { @@ -195,7 +189,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { /** * testSendToPartition checks the partitioning behavior * - * 1. The specified partition-id should be respected + * The specified partition-id should be respected */ @Test def testSendToPartition() { @@ -217,7 +211,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { for (i <- 0 until numRecords) yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList - futures.map(_.wait) + futures.map(_.get) for (future <- futures) assertTrue("Request should have completed", future.isDone) @@ -250,6 +244,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } } + /** + * testAutoCreateTopic + * + * The topic should be created upon sending the first message + */ @Test def testAutoCreateTopic() { val props = new Properties() @@ -259,8 +258,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response = producer.send(record) - assertEquals("Should have offset 0", 0L, response.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1c7a450..772d214 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,24 +23,27 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties -import junit.framework.AssertionFailedError -import junit.framework.Assert._ +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.TimeUnit + +import collection.mutable.Map +import collection.mutable.ListBuffer + +import org.I0Itec.zkclient.ZkClient + import kafka.server._ import kafka.producer._ import kafka.message._ -import org.I0Itec.zkclient.ZkClient +import kafka.api._ import kafka.cluster.Broker -import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit -import kafka.api._ -import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition -import junit.framework.Assert import kafka.admin.AdminUtils +import kafka.producer.ProducerConfig +import junit.framework.AssertionFailedError +import junit.framework.Assert._ /** * Utility functions to help with testing @@ -526,7 +529,7 @@ object TestUtils extends Logging { } def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { - Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) } diff --git a/gradle.properties b/gradle.properties index 447ee44..ad7a2f0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,3 +18,7 @@ version=0.8.1 scalaVersion=2.8.0 task=build +#mavenUrl=file://localhost/tmp/maven +mavenUrl=http://your.maven.repository +mavenUsername=your.username +mavenPassword=your.password