From d2380250903b5f927ff33fec4f65c403fad09dcb Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 6 Jan 2015 16:19:29 -0800 Subject: [PATCH 1/8] removed broker code for handling acks>1 and made NotEnoughReplicasAfterAppendException non-retriable --- .../kafka/common/errors/NotEnoughReplicasAfterAppendException.java | 2 +- core/src/main/scala/kafka/cluster/Partition.scala | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java index 75c80a9..23d53f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java @@ -21,7 +21,7 @@ package org.apache.kafka.common.errors; * This exception is raised when the low ISR size is discovered *after* the message * was already appended to the log. Producer retries will cause duplicates. */ -public class NotEnoughReplicasAfterAppendException extends RetriableException { +public class NotEnoughReplicasAfterAppendException extends ApiException { private static final long serialVersionUID = 1L; public NotEnoughReplicasAfterAppendException() { diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b230e9a..5a669bf 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -317,8 +317,6 @@ class Partition(val topic: String, } else { (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) } - } else if (requiredAcks > 0 && numAcks >= requiredAcks) { - (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) case None => -- 1.9.3 (Apple Git-50) From f14b9a3695e894b56a91dadd748e43f7852f2d5b Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 14 Jan 2015 15:41:00 -0800 Subject: [PATCH 2/8] added early handling of invalid number of acks to handler and a test --- .../kafka/clients/producer/KafkaProducer.java | 22 ++--- .../kafka/clients/tools/ProducerPerformance.java | 7 +- .../java/org/apache/kafka/common/MetricName.java | 24 ++--- .../errors/InvalidRequiredAcksException.java | 25 +++++ .../org/apache/kafka/common/protocol/Errors.java | 3 +- core/src/main/scala/kafka/server/KafkaApis.scala | 99 ++++++++++++-------- .../api/RequestResponseSerializationTest.scala | 2 +- .../test/scala/unit/kafka/api/testKafkaApis.scala | 104 +++++++++++++++++++++ kafka-patch-review.py | 31 ++---- 9 files changed, 222 insertions(+), 95 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java create mode 100644 core/src/test/scala/unit/kafka/api/testKafkaApis.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index fc71710..c79149a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -265,32 +265,32 @@ public class KafkaProducer implements Producer { *

* If you want to simulate a simple blocking call you can do the following: * - *

{@code
-     * producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
-     * }
+ *
+     *   producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
+     * 
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * - *

{@code
-     * ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
+     * 
+     *   ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
      *   producer.send(myRecord,
-     *                new Callback() {
+     *                 new Callback() {
      *                     public void onCompletion(RecordMetadata metadata, Exception e) {
      *                         if(e != null)
      *                             e.printStackTrace();
      *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
      *                     }
-     *                });
-     * }
+ * }); + *
* * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: * - *
{@code
-     * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
+     * 
+     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
-     * }
+ *
*

* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 689bae9..1b82800 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -15,7 +15,10 @@ package org.apache.kafka.clients.tools; import java.util.Arrays; import java.util.Properties; -import org.apache.kafka.clients.producer.*; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; public class ProducerPerformance { @@ -43,8 +46,6 @@ public class ProducerPerformance { throw new IllegalArgumentException("Invalid property: " + args[i]); props.put(pieces[0], pieces[1]); } - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index 7e977e9..4e810d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -19,40 +19,34 @@ import org.apache.kafka.common.utils.Utils; /** * The MetricName class encapsulates a metric's name, logical group and its related attributes - *

+ *

* This class captures the following parameters *

  *  name The name of the metric
  *  group logical group name of the metrics to which this metric belongs.
  *  description A human-readable description to include in the metric. This is optional.
  *  tags additional key/value attributes of the metric. This is optional.
- * 
+ * * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting. - *

+ * * Ex: standard JMX MBean can be constructed like domainName:type=group,key1=val1,key2=val2 - *

+ * * Usage looks something like this: - *

{@code
+ * 
  * // set up metrics:
  * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- *
+ * Sensor sensor = metrics.sensor("message-sizes");
  * Map metricTags = new LinkedHashMap();
  * metricTags.put("client-id", "producer-1");
  * metricTags.put("topic", "topic");
- *
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
  * sensor.add(metricName, new Avg());
- *
- * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
+ * metricName = new MetricName("message-size-max", "producer-metrics",metricTags);
  * sensor.add(metricName, new Max());
  *
- * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
- * sensor.add(metricName, new Min());
- *
  * // as messages are sent we record the sizes
  * sensor.record(messageSize);
- * }
+ *
*/ public final class MetricName { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java new file mode 100644 index 0000000..2e0a3c6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class InvalidRequiredAcksException extends ApiException { + private static final long serialVersionUID = 1L; + + public InvalidRequiredAcksException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 3316b6a..03937cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -46,7 +46,8 @@ public enum Errors { INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), - NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); + NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")), + INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request contained request.required.acks with invalid value")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); static { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c011a1b..18116eb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -25,6 +25,8 @@ import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response import kafka.controller.KafkaController import kafka.utils.{SystemTime, Logging} +import org.apache.kafka.common.errors.InvalidRequiredAcksException +import org.apache.kafka.common.protocol.Errors import scala.collection._ @@ -157,53 +159,68 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - // the callback for sending the response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { - var errorInResponse = false - responseStatus.foreach { case (topicAndPartition, status) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { - debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" - .format(produceRequest.correlationId, produceRequest.clientId, - topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) - errorInResponse = true - } + if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) { + // If required.acks is outside accepted range, something is wrong with the client + // Just return an error and don't handle the request at all + + val responseStatus = produceRequest.data.map { + case(topicAndPartition, messageSet) => + (topicAndPartition -> + ProducerResponseStatus( Errors.forException(new InvalidRequiredAcksException("")).code(), + LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } else { + + // the callback for sending the response + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + var errorInResponse = false + responseStatus.foreach { case (topicAndPartition, status) => + // we only print warnings for known errors here; if it is unknown, it will cause + // an error message in the replica manager + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" + .format(produceRequest.correlationId, produceRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) + errorInResponse = true + } + } - if (produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" - .format(produceRequest.correlationId, produceRequest.clientId)) - requestChannel.closeConnection(request.processor, request) + if (produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" + .format(produceRequest.correlationId, produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } } else { - requestChannel.noOperation(request.processor, request) + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - } - - // only allow appending to internal topic partitions - // if the client is not from admin - val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId - // call the replica manager to append messages to the replicas - replicaManager.appendMessages( - produceRequest.ackTimeoutMs.toLong, - produceRequest.requiredAcks, - internalTopicsAllowed, - produceRequest.data, - sendResponseCallback) - - // if the request is put into the purgatory, it will have a held reference - // and hence cannot be garbage collected; hence we clear its data here in - // order to let GC re-claim its memory since it is already appended to log - produceRequest.emptyData() + // only allow appending to internal topic partitions + // if the client is not from admin + val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId + + // call the replica manager to append messages to the replicas + replicaManager.appendMessages( + produceRequest.ackTimeoutMs.toLong, + produceRequest.requiredAcks, + internalTopicsAllowed, + produceRequest.data, + sendResponseCallback) + + // if the request is put into the purgatory, it will have a held reference + // and hence cannot be garbage collected; hence we clear its data here in + // order to let GC re-claim its memory since it is already appended to log + produceRequest.emptyData() + } } /** diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..8d0fa53 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -59,7 +59,7 @@ object SerializationTestUtils { private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes)) private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1, partitionDataMessage2, partitionDataMessage3) - private val topicDataProducerRequest = { + val topicDataProducerRequest = { val groupedData = Array(topic1, topic2).flatMap(topic => partitionDataProducerRequestArray.zipWithIndex.map { diff --git a/core/src/test/scala/unit/kafka/api/testKafkaApis.scala b/core/src/test/scala/unit/kafka/api/testKafkaApis.scala new file mode 100644 index 0000000..75b3c2f --- /dev/null +++ b/core/src/test/scala/unit/kafka/api/testKafkaApis.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.ByteBuffer + +import kafka.network.{RequestChannel, SocketServer} +import org.apache.kafka.common.errors.InvalidRequiredAcksException +import org.apache.kafka.common.protocol.Errors +import org.junit.{After, Test} +import kafka.server.KafkaApis +import org.scalatest.junit.JUnitSuite + +import scala.collection.Map + +class testKafkaApis extends JUnitSuite{ + + val server: SocketServer = new SocketServer(0, + host = null, + port = kafka.utils.TestUtils.choosePort, + numProcessorThreads = 1, + maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, + maxRequestSize = 5000, + maxConnectionsPerIp = 5, + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = Map.empty[String,Int]) + server.startup() + + def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { + val outgoing = new DataOutputStream(socket.getOutputStream) + outgoing.writeInt(request.length + 2) + outgoing.writeShort(id) + outgoing.write(request) + outgoing.flush() + } + + def receiveResponse(socket: Socket): Array[Byte] = { + val incoming = new DataInputStream(socket.getInputStream) + val len = incoming.readInt() + val response = new Array[Byte](len) + incoming.readFully(response) + response + } + + def processProduceRequest(channel: RequestChannel) { + val request = channel.receiveRequest + + val apis = new KafkaApis(server.requestChannel, null, null, null, 0, null, null) + + apis.handleProducerRequest(request) + } + + def connect(s:SocketServer = server) = new Socket("localhost", s.port) + + @After + def cleanup() { + server.shutdown() + } + + @Test + def testProduceRequestRequiredAcks() { + + val socket = connect() + + // creating request with illegal number of required.acks + + val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) + + val byteBuffer = ByteBuffer.allocate(produceRequest.sizeInBytes) + produceRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(socket, 0, serializedBytes) + + processProduceRequest(server.requestChannel) + + val response = ProducerResponse.readFrom(ByteBuffer.wrap(receiveResponse(socket))) + + assert(response.status.values.head.error == + Errors.forException(new InvalidRequiredAcksException("")).code()) + } + +} diff --git a/kafka-patch-review.py b/kafka-patch-review.py index 9592680..b7f132f 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -7,31 +7,22 @@ import time import datetime import tempfile import commands -import getpass from jira.client import JIRA def get_jira_config(): # read the config file home=jira_home=os.getenv('HOME') home=home.rstrip('/') - if not (os.path.isfile(home + '/jira.ini')): - jira_user=raw_input('JIRA user :') - jira_pass=getpass.getpass('JIRA password :') - jira_config = {'user':jira_user, 'password':jira_pass} - return jira_config - else: - jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) - return jira_config + jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) + return jira_config -def get_jira(jira_config): +def get_jira(): options = { 'server': 'https://issues.apache.org/jira' } + + jira_config = get_jira_config() jira = JIRA(options=options,basic_auth=(jira_config['user'], jira_config['password'])) - # (Force) verify the auth was really done - jira_session=jira.session() - if (jira_session is None): - raise Exception("Failed to login to the JIRA instance") return jira def cmd_exists(cmd): @@ -90,15 +81,6 @@ def main(): p=os.popen(git_remote_update) p.close() - # Get JIRA configuration and login to JIRA to ensure the credentials work, before publishing the patch to the review board - print "Verifying JIRA connection configurations" - try: - jira_config=get_jira_config() - jira=get_jira(jira_config) - except: - print "Failed to login to the JIRA instance", sys.exc_info()[0], sys.exc_info()[1] - sys.exit(1) - rb_command= post_review_tool + " --publish --tracking-branch " + opt.branch + " --target-groups=kafka --bugs-closed=" + opt.jira if opt.debug: rb_command=rb_command + " --debug" @@ -141,6 +123,7 @@ def main(): p.close() print 'Creating diff against', opt.branch, 'and uploading patch to JIRA',opt.jira + jira=get_jira() issue = jira.issue(opt.jira) attachment=open(patch_file) jira.add_attachment(issue,attachment) @@ -163,6 +146,8 @@ def main(): for t in transitions: transitionsMap[t['name']] = t['id'] + jira_config = get_jira_config() + if('Submit Patch' in transitionsMap): jira.transition_issue(issue, transitionsMap['Submit Patch'] , assignee={'name': jira_config['user']} ) -- 1.9.3 (Apple Git-50) From 8d4049ca6e09ae1431ae2bf6411e42e682039244 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 10 Feb 2015 17:00:41 -0800 Subject: [PATCH 3/8] moved check for legal requiredAcks to append and fixed the tests accordingly --- .../kafka/clients/producer/KafkaProducer.java | 22 ++--- .../kafka/clients/tools/ProducerPerformance.java | 1 - .../java/org/apache/kafka/common/MetricName.java | 24 +++-- .../errors/InvalidRequiredAcksException.java | 2 +- .../NotEnoughReplicasAfterAppendException.java | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 97 ++++++++----------- .../main/scala/kafka/server/ReplicaManager.scala | 77 +++++++++------ .../api/RequestResponseSerializationTest.scala | 2 +- .../test/scala/unit/kafka/api/testKafkaApis.scala | 104 --------------------- .../unit/kafka/server/ReplicaManagerTest.scala | 33 +++++++ kafka-patch-review.py | 31 ++++-- 11 files changed, 173 insertions(+), 222 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/api/testKafkaApis.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3c51f46..1fd6917 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -266,32 +266,32 @@ public class KafkaProducer implements Producer { *

* If you want to simulate a simple blocking call you can do the following: * - *

-     *   producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
-     * 
+ *
{@code
+     * producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
+     * }
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * - *

-     *   ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
+     * 
{@code
+     * ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
      *   producer.send(myRecord,
-     *                 new Callback() {
+     *                new Callback() {
      *                     public void onCompletion(RecordMetadata metadata, Exception e) {
      *                         if(e != null)
      *                             e.printStackTrace();
      *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
      *                     }
-     *                 });
-     * 
+ * }); + * }
* * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: * - *
-     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+     * 
{@code
+     * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
-     * 
+ * }
*

* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index b4ef71f..13f4d59 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -43,7 +43,6 @@ public class ProducerPerformance { throw new IllegalArgumentException("Invalid property: " + args[i]); props.put(pieces[0], pieces[1]); } - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer(props); diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index 5ed0a8b..04b4a09 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -19,34 +19,40 @@ import org.apache.kafka.common.utils.Utils; /** * The MetricName class encapsulates a metric's name, logical group and its related attributes - *

+ *

* This class captures the following parameters *

  *  name The name of the metric
  *  group logical group name of the metrics to which this metric belongs.
  *  description A human-readable description to include in the metric. This is optional.
  *  tags additional key/value attributes of the metric. This is optional.
- *   
+ * * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting. - * + *

* Ex: standard JMX MBean can be constructed like domainName:type=group,key1=val1,key2=val2 - * + *

* Usage looks something like this: - *

+ * 
{@code
  * // set up metrics:
  * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
+ * Sensor sensor = metrics.sensor("message-sizes");
+ *
  * Map metricTags = new LinkedHashMap();
  * metricTags.put("client-id", "producer-1");
  * metricTags.put("topic", "topic");
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ *
+ * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
  * sensor.add(metricName, new Avg());
- * metricName = new MetricName("message-size-max", "producer-metrics",metricTags);
+ *
+ * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
  * sensor.add(metricName, new Max());
  *
+ * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
+ * sensor.add(metricName, new Min());
+ *
  * // as messages are sent we record the sizes
  * sensor.record(messageSize);
- * 
+ * }
*/ public final class MetricName { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java index 2e0a3c6..9d19b28 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java index 8174b13..ea1eab3 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java @@ -21,7 +21,7 @@ package org.apache.kafka.common.errors; * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates. */ public class NotEnoughReplicasAfterAppendException extends ApiException { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; public NotEnoughReplicasAfterAppendException() { super(); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index dedf691..de24cd9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -167,68 +167,53 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) { - // If required.acks is outside accepted range, something is wrong with the client - // Just return an error and don't handle the request at all - - val responseStatus = produceRequest.data.map { - case (topicAndPartition, messageSet) => - (topicAndPartition -> - ProducerResponseStatus(Errors.forException(new InvalidRequiredAcksException("")).code(), - LogAppendInfo.UnknownLogAppendInfo.firstOffset)) - } - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else { - - // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { - var errorInResponse = false - responseStatus.foreach { case (topicAndPartition, status) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { - debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" - .format(produceRequest.correlationId, produceRequest.clientId, - topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) - errorInResponse = true - } + // the callback for sending a produce response + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + var errorInResponse = false + responseStatus.foreach { case (topicAndPartition, status) => + // we only print warnings for known errors here; if it is unknown, it will cause + // an error message in the replica manager + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" + .format(produceRequest.correlationId, produceRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) + errorInResponse = true } + } - if (produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" - .format(produceRequest.correlationId, produceRequest.clientId)) - requestChannel.closeConnection(request.processor, request) - } else { - requestChannel.noOperation(request.processor, request) - } + if (produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" + .format(produceRequest.correlationId, produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.noOperation(request.processor, request) } + } else { + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - - // only allow appending to internal topic partitions - // if the client is not from admin - val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId - - // call the replica manager to append messages to the replicas - replicaManager.appendMessages( - produceRequest.ackTimeoutMs.toLong, - produceRequest.requiredAcks, - internalTopicsAllowed, - produceRequest.data, - sendResponseCallback) - - // if the request is put into the purgatory, it will have a held reference - // and hence cannot be garbage collected; hence we clear its data here in - // order to let GC re-claim its memory since it is already appended to log - produceRequest.emptyData() } + + // only allow appending to internal topic partitions + // if the client is not from admin + val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId + + // call the replica manager to append messages to the replicas + replicaManager.appendMessages( + produceRequest.ackTimeoutMs.toLong, + produceRequest.requiredAcks, + internalTopicsAllowed, + produceRequest.data, + sendResponseCallback) + + // if the request is put into the purgatory, it will have a held reference + // and hence cannot be garbage collected; hence we clear its data here in + // order to let GC re-claim its memory since it is already appended to log + produceRequest.emptyData() } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fb948b9..1de3f85 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,6 +29,8 @@ import kafka.message.{ByteBufferMessageSet, MessageSet} import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit +import org.apache.kafka.common.protocol.Errors + import scala.Predef._ import scala.collection._ import scala.collection.mutable.HashMap @@ -254,39 +256,54 @@ class ReplicaManager(val config: KafkaConfig, responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) - debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - - val produceStatus = localProduceResults.map{ case (topicAndPartition, result) => - topicAndPartition -> - ProducePartitionStatus( - result.info.lastOffset + 1, // required offset - ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status - } - if(requiredAcks == 0 || - requiredAcks == 1 || - messagesPerPartition.size <= 0 || - localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) { - // in case of the following we can respond immediately: - // - // 1. required acks = 0 or 1 - // 2. there is no data to append - // 3. all partition appends have failed - val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) - } else { - // create delayed produce operation - val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) - val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) + // requiredAcks has legal values - append normally + if (requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0) { - // create a list of (topic, partition) pairs to use as keys for this delayed produce operation - val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq + val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) + debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - // try to complete the request immediately, otherwise put it into the purgatory - // this is because while the delayed produce operation is being created, new - // requests may arrive and hence make this operation completable. - delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + val produceStatus = localProduceResults.map { case (topicAndPartition, result) => + topicAndPartition -> + ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status + } + + if (requiredAcks == 0 || + requiredAcks == 1 || + messagesPerPartition.size <= 0 || + localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) { + // in case of the following we can respond immediately: + // + // 1. required acks = 0 or 1 + // 2. there is no data to append + // 3. all partition appends have failed + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + responseCallback(produceResponseStatus) + } else { + // create delayed produce operation + val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) + + // create a list of (topic, partition) pairs to use as keys for this delayed produce operation + val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq + + // try to complete the request immediately, otherwise put it into the purgatory + // this is because while the delayed produce operation is being created, new + // requests may arrive and hence make this operation completable. + delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + } + } else { + // If required.acks is outside accepted range, something is wrong with the client + // Just return an error and don't handle the request at all + val responseStatus = messagesPerPartition.map { + case (topicAndPartition, messageSet) => + (topicAndPartition -> + ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, + LogAppendInfo.UnknownLogAppendInfo.firstOffset)) + } + responseCallback(responseStatus) } } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fba852a..a1f72f8 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -59,7 +59,7 @@ object SerializationTestUtils { private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes)) private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1, partitionDataMessage2, partitionDataMessage3) - val topicDataProducerRequest = { + private val topicDataProducerRequest = { val groupedData = Array(topic1, topic2).flatMap(topic => partitionDataProducerRequestArray.zipWithIndex.map { diff --git a/core/src/test/scala/unit/kafka/api/testKafkaApis.scala b/core/src/test/scala/unit/kafka/api/testKafkaApis.scala deleted file mode 100644 index 75b3c2f..0000000 --- a/core/src/test/scala/unit/kafka/api/testKafkaApis.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.io.{DataInputStream, DataOutputStream} -import java.net.Socket -import java.nio.ByteBuffer - -import kafka.network.{RequestChannel, SocketServer} -import org.apache.kafka.common.errors.InvalidRequiredAcksException -import org.apache.kafka.common.protocol.Errors -import org.junit.{After, Test} -import kafka.server.KafkaApis -import org.scalatest.junit.JUnitSuite - -import scala.collection.Map - -class testKafkaApis extends JUnitSuite{ - - val server: SocketServer = new SocketServer(0, - host = null, - port = kafka.utils.TestUtils.choosePort, - numProcessorThreads = 1, - maxQueuedRequests = 50, - sendBufferSize = 300000, - recvBufferSize = 300000, - maxRequestSize = 5000, - maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = Map.empty[String,Int]) - server.startup() - - def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { - val outgoing = new DataOutputStream(socket.getOutputStream) - outgoing.writeInt(request.length + 2) - outgoing.writeShort(id) - outgoing.write(request) - outgoing.flush() - } - - def receiveResponse(socket: Socket): Array[Byte] = { - val incoming = new DataInputStream(socket.getInputStream) - val len = incoming.readInt() - val response = new Array[Byte](len) - incoming.readFully(response) - response - } - - def processProduceRequest(channel: RequestChannel) { - val request = channel.receiveRequest - - val apis = new KafkaApis(server.requestChannel, null, null, null, 0, null, null) - - apis.handleProducerRequest(request) - } - - def connect(s:SocketServer = server) = new Socket("localhost", s.port) - - @After - def cleanup() { - server.shutdown() - } - - @Test - def testProduceRequestRequiredAcks() { - - val socket = connect() - - // creating request with illegal number of required.acks - - val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - - val byteBuffer = ByteBuffer.allocate(produceRequest.sizeInBytes) - produceRequest.writeTo(byteBuffer) - byteBuffer.rewind() - val serializedBytes = new Array[Byte](byteBuffer.remaining) - byteBuffer.get(serializedBytes) - - sendRequest(socket, 0, serializedBytes) - - processProduceRequest(server.requestChannel) - - val response = ProducerResponse.readFrom(ByteBuffer.wrap(receiveResponse(socket))) - - assert(response.status.values.head.error == - Errors.forException(new InvalidRequiredAcksException("")).code()) - } - -} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index faa9071..878551f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -17,16 +17,22 @@ package kafka.server +import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest} +import kafka.common.TopicAndPartition import kafka.utils.{MockScheduler, MockTime, TestUtils} import java.util.concurrent.atomic.AtomicBoolean import java.io.File +import org.apache.kafka.common.errors.InvalidRequiredAcksException +import org.apache.kafka.common.protocol.Errors import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite import org.junit.Test +import scala.collection.Map + class ReplicaManagerTest extends JUnit3Suite { val topic = "test-topic" @@ -63,4 +69,31 @@ class ReplicaManagerTest extends JUnit3Suite { // shutdown the replica manager upon test completion rm.shutdown(false) } + + @Test + def testIllegalRequiredAcks() { + val props = TestUtils.createBrokerConfig(1) + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { + if (responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) { + throw new InvalidRequiredAcksException("") + } + } + + try { + rm.appendMessages(0, 3, false, produceRequest.data, callback) + fail("Expected InvalidRequiredAcksException") + } catch { + case e: InvalidRequiredAcksException => // expected + } + + rm.shutdown(false) + TestUtils.verifyNonDaemonThreadsStatus + + } } diff --git a/kafka-patch-review.py b/kafka-patch-review.py index b7f132f..9592680 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -7,22 +7,31 @@ import time import datetime import tempfile import commands +import getpass from jira.client import JIRA def get_jira_config(): # read the config file home=jira_home=os.getenv('HOME') home=home.rstrip('/') - jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) - return jira_config + if not (os.path.isfile(home + '/jira.ini')): + jira_user=raw_input('JIRA user :') + jira_pass=getpass.getpass('JIRA password :') + jira_config = {'user':jira_user, 'password':jira_pass} + return jira_config + else: + jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) + return jira_config -def get_jira(): +def get_jira(jira_config): options = { 'server': 'https://issues.apache.org/jira' } - - jira_config = get_jira_config() jira = JIRA(options=options,basic_auth=(jira_config['user'], jira_config['password'])) + # (Force) verify the auth was really done + jira_session=jira.session() + if (jira_session is None): + raise Exception("Failed to login to the JIRA instance") return jira def cmd_exists(cmd): @@ -81,6 +90,15 @@ def main(): p=os.popen(git_remote_update) p.close() + # Get JIRA configuration and login to JIRA to ensure the credentials work, before publishing the patch to the review board + print "Verifying JIRA connection configurations" + try: + jira_config=get_jira_config() + jira=get_jira(jira_config) + except: + print "Failed to login to the JIRA instance", sys.exc_info()[0], sys.exc_info()[1] + sys.exit(1) + rb_command= post_review_tool + " --publish --tracking-branch " + opt.branch + " --target-groups=kafka --bugs-closed=" + opt.jira if opt.debug: rb_command=rb_command + " --debug" @@ -123,7 +141,6 @@ def main(): p.close() print 'Creating diff against', opt.branch, 'and uploading patch to JIRA',opt.jira - jira=get_jira() issue = jira.issue(opt.jira) attachment=open(patch_file) jira.add_attachment(issue,attachment) @@ -146,8 +163,6 @@ def main(): for t in transitions: transitionsMap[t['name']] = t['id'] - jira_config = get_jira_config() - if('Submit Patch' in transitionsMap): jira.transition_issue(issue, transitionsMap['Submit Patch'] , assignee={'name': jira_config['user']} ) -- 1.9.3 (Apple Git-50) From 8c5167f480830ee9752afe4b00ed55593da45de7 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 11 Feb 2015 18:42:27 -0800 Subject: [PATCH 4/8] changing exception back to retriable --- .../errors/NotEnoughReplicasAfterAppendException.java | 14 +------------- .../unit/kafka/api/RequestResponseSerializationTest.scala | 2 +- .../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 14 ++++---------- 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java index ea1eab3..fd7f6d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java @@ -20,23 +20,11 @@ package org.apache.kafka.common.errors; * Number of insync replicas for the partition is lower than min.insync.replicas This exception is raised when the low * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates. */ -public class NotEnoughReplicasAfterAppendException extends ApiException { +public class NotEnoughReplicasAfterAppendException extends RetriableException { private static final long serialVersionUID = 1L; - public NotEnoughReplicasAfterAppendException() { - super(); - } - - public NotEnoughReplicasAfterAppendException(String message, Throwable cause) { - super(message, cause); - } - public NotEnoughReplicasAfterAppendException(String message) { super(message); } - public NotEnoughReplicasAfterAppendException(Throwable cause) { - super(cause); - } - } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index a1f72f8..fba852a 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -59,7 +59,7 @@ object SerializationTestUtils { private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes)) private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1, partitionDataMessage2, partitionDataMessage3) - private val topicDataProducerRequest = { + val topicDataProducerRequest = { val groupedData = Array(topic1, topic2).flatMap(topic => partitionDataProducerRequestArray.zipWithIndex.map { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 878551f..28250d9 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -80,19 +80,13 @@ class ReplicaManagerTest extends JUnit3Suite { val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { - if (responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) { - throw new InvalidRequiredAcksException("") - } + assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) } - try { - rm.appendMessages(0, 3, false, produceRequest.data, callback) - fail("Expected InvalidRequiredAcksException") - } catch { - case e: InvalidRequiredAcksException => // expected - } + rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback) + + rm.shutdown(false); - rm.shutdown(false) TestUtils.verifyNonDaemonThreadsStatus } -- 1.9.3 (Apple Git-50) From 60d3a660fd23fdc4d0287de708b0699708561455 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 11 Feb 2015 18:47:49 -0800 Subject: [PATCH 5/8] cleaning unused exceptions --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index de24cd9..703886a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -30,8 +30,6 @@ import kafka.log._ import kafka.network._ import kafka.network.RequestChannel.Response import kafka.utils.{SystemTime, Logging} -import org.apache.kafka.common.errors.InvalidRequiredAcksException -import org.apache.kafka.common.protocol.Errors import scala.collection._ -- 1.9.3 (Apple Git-50) From e0e935ef4c8de0485c2e1cd322232677292998aa Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 11 Feb 2015 23:13:24 -0800 Subject: [PATCH 6/8] refactored appendToLog for clarity --- .../main/scala/kafka/server/ReplicaManager.scala | 34 +++++++++++++--------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1de3f85..57fad22 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -255,11 +255,9 @@ class ReplicaManager(val config: KafkaConfig, messagesPerPartition: Map[TopicAndPartition, MessageSet], responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { - val sTime = SystemTime.milliseconds - - // requiredAcks has legal values - append normally - if (requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0) { + if (isValidRequiredAcks(requiredAcks)) { + val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) @@ -270,15 +268,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status } - if (requiredAcks == 0 || - requiredAcks == 1 || - messagesPerPartition.size <= 0 || - localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) { - // in case of the following we can respond immediately: - // - // 1. required acks = 0 or 1 - // 2. there is no data to append - // 3. all partition appends have failed + if (respondBeforeReplication(requiredAcks, messagesPerPartition, localProduceResults)) { val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) responseCallback(produceResponseStatus) } else { @@ -307,6 +297,24 @@ class ReplicaManager(val config: KafkaConfig, } } + // in case of the following we can respond immediately: + // + // 1. required acks = 0 or 1 + // 2. there is no data to append + // 3. all partition appends have failed + private def respondBeforeReplication(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet], + localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = { + + requiredAcks == 0 || + requiredAcks == 1 || + messagesPerPartition.size <= 0 || + localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size + } + + private def isValidRequiredAcks(requiredAcks: Short): Boolean = { + requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 + } + /** * Append the messages to the local replica logs */ -- 1.9.3 (Apple Git-50) From a3d6dcaf1bcc6189c3ef17b63719ae4d12dcb789 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 12 Feb 2015 17:09:42 -0800 Subject: [PATCH 7/8] KAFKA-1948; Fix ConsumerTest.testPartitionReassignmentCallback handling issue; reviewed by Gwen Shapira --- core/src/main/scala/kafka/controller/KafkaController.scala | 2 ++ core/src/test/scala/integration/kafka/api/ConsumerTest.scala | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 66df6d2..e9b4dc6 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -365,6 +365,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerContext.epoch=0 controllerContext.epochZkVersion=0 brokerState.newState(RunningAsBroker) + + info("Broker %d resigned as the controller".format(config.brokerId)) } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 798f035..2802a39 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -230,7 +230,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(1, parts.size) assertNotNull(parts(0).leader()) - // shutdown the co-ordinator + // shutdown the coordinator val coordinator = parts(0).leader().id() this.servers(coordinator).shutdown() @@ -239,6 +239,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) assertEquals(2, callback.callsToAssigned) assertEquals(2, callback.callsToRevoked) + + // restart the coordinator since it may also be hosting "test" topic + this.servers(coordinator).startup() consumer0.close() } -- 1.9.3 (Apple Git-50) From 6e23b17ba08fa61ed453d6663d205e5cd22d340e Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 12 Feb 2015 18:57:22 -0800 Subject: [PATCH 8/8] improved readability of append rules --- .../main/scala/kafka/server/ReplicaManager.scala | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 57fad22..ce36cc7 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -268,10 +268,7 @@ class ReplicaManager(val config: KafkaConfig, ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status } - if (respondBeforeReplication(requiredAcks, messagesPerPartition, localProduceResults)) { - val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) - } else { + if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) @@ -283,6 +280,11 @@ class ReplicaManager(val config: KafkaConfig, // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + + } else { + // we can respond immediately + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client @@ -297,18 +299,16 @@ class ReplicaManager(val config: KafkaConfig, } } - // in case of the following we can respond immediately: + // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete // - // 1. required acks = 0 or 1 - // 2. there is no data to append - // 3. all partition appends have failed - private def respondBeforeReplication(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet], + // 1. required acks = -1 + // 2. there is data to append + // 3. at least one partition append was successful (fewer errors than partitions) + private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet], localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = { - - requiredAcks == 0 || - requiredAcks == 1 || - messagesPerPartition.size <= 0 || - localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size + requiredAcks == -1 && + messagesPerPartition.size > 0 && + localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size } private def isValidRequiredAcks(requiredAcks: Short): Boolean = { -- 1.9.3 (Apple Git-50)