From e79ebdfe222288f2c5d997ba7e7c5e18c8c514ed Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 13 Jan 2015 09:39:20 -0800 Subject: [PATCH 1/5] KAFKA-1854 Allow JIRA username and password to be prompted in the absence of a jira.ini file, during patch submission; reviewed by Neha Narkhede --- kafka-patch-review.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/kafka-patch-review.py b/kafka-patch-review.py index b7f132f..9592680 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -7,22 +7,31 @@ import time import datetime import tempfile import commands +import getpass from jira.client import JIRA def get_jira_config(): # read the config file home=jira_home=os.getenv('HOME') home=home.rstrip('/') - jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) - return jira_config + if not (os.path.isfile(home + '/jira.ini')): + jira_user=raw_input('JIRA user :') + jira_pass=getpass.getpass('JIRA password :') + jira_config = {'user':jira_user, 'password':jira_pass} + return jira_config + else: + jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) + return jira_config -def get_jira(): +def get_jira(jira_config): options = { 'server': 'https://issues.apache.org/jira' } - - jira_config = get_jira_config() jira = JIRA(options=options,basic_auth=(jira_config['user'], jira_config['password'])) + # (Force) verify the auth was really done + jira_session=jira.session() + if (jira_session is None): + raise Exception("Failed to login to the JIRA instance") return jira def cmd_exists(cmd): @@ -81,6 +90,15 @@ def main(): p=os.popen(git_remote_update) p.close() + # Get JIRA configuration and login to JIRA to ensure the credentials work, before publishing the patch to the review board + print "Verifying JIRA connection configurations" + try: + jira_config=get_jira_config() + jira=get_jira(jira_config) + except: + print "Failed to login to the JIRA instance", sys.exc_info()[0], sys.exc_info()[1] + sys.exit(1) + rb_command= post_review_tool + " --publish --tracking-branch " + opt.branch + " --target-groups=kafka --bugs-closed=" + opt.jira if opt.debug: rb_command=rb_command + " --debug" @@ -123,7 +141,6 @@ def main(): p.close() print 'Creating diff against', opt.branch, 'and uploading patch to JIRA',opt.jira - jira=get_jira() issue = jira.issue(opt.jira) attachment=open(patch_file) jira.add_attachment(issue,attachment) @@ -146,8 +163,6 @@ def main(): for t in transitions: transitionsMap[t['name']] = t['id'] - jira_config = get_jira_config() - if('Submit Patch' in transitionsMap): jira.transition_issue(issue, transitionsMap['Submit Patch'] , assignee={'name': jira_config['user']} ) -- 1.9.3 (Apple Git-50) From bfb2da3c82ab01f5914d5559ee196d5aa977b17c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 13 Jan 2015 09:54:32 -0800 Subject: [PATCH 2/5] trivial change to add byte serializer to ProducerPerformance; patched by Jun Rao --- .../java/org/apache/kafka/clients/tools/ProducerPerformance.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 1b82800..689bae9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -15,10 +15,7 @@ package org.apache.kafka.clients.tools; import java.util.Arrays; import java.util.Properties; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.*; public class ProducerPerformance { @@ -46,6 +43,8 @@ public class ProducerPerformance { throw new IllegalArgumentException("Invalid property: " + args[i]); props.put(pieces[0], pieces[1]); } + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ -- 1.9.3 (Apple Git-50) From a611178408cf8497054ff015caba18cfcff70a60 Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Wed, 14 Jan 2015 12:02:50 -0800 Subject: [PATCH 3/5] KAFKA-1723 (delta patch to fix javadoc); make the metrics name in new producer more standard; patched by Manikumar Reddy; reviewed by Jun Rao --- .../kafka/clients/producer/KafkaProducer.java | 22 ++++++++++---------- .../java/org/apache/kafka/common/MetricName.java | 24 ++++++++++++++-------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c79149a..fc71710 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -265,32 +265,32 @@ public class KafkaProducer implements Producer { *

* If you want to simulate a simple blocking call you can do the following: * - *

-     *   producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
-     * 
+ *
{@code
+     * producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
+     * }
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * - *

-     *   ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
+     * 
{@code
+     * ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
      *   producer.send(myRecord,
-     *                 new Callback() {
+     *                new Callback() {
      *                     public void onCompletion(RecordMetadata metadata, Exception e) {
      *                         if(e != null)
      *                             e.printStackTrace();
      *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
      *                     }
-     *                 });
-     * 
+ * }); + * }
* * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: * - *
-     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+     * 
{@code
+     * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
-     * 
+ * }
*

* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index 4e810d5..7e977e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -19,34 +19,40 @@ import org.apache.kafka.common.utils.Utils; /** * The MetricName class encapsulates a metric's name, logical group and its related attributes - *

+ *

* This class captures the following parameters *

  *  name The name of the metric
  *  group logical group name of the metrics to which this metric belongs.
  *  description A human-readable description to include in the metric. This is optional.
  *  tags additional key/value attributes of the metric. This is optional.
- *   
+ * * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting. - * + *

* Ex: standard JMX MBean can be constructed like domainName:type=group,key1=val1,key2=val2 - * + *

* Usage looks something like this: - *

+ * 
{@code
  * // set up metrics:
  * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
+ * Sensor sensor = metrics.sensor("message-sizes");
+ *
  * Map metricTags = new LinkedHashMap();
  * metricTags.put("client-id", "producer-1");
  * metricTags.put("topic", "topic");
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ *
+ * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
  * sensor.add(metricName, new Avg());
- * metricName = new MetricName("message-size-max", "producer-metrics",metricTags);
+ *
+ * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
  * sensor.add(metricName, new Max());
  *
+ * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
+ * sensor.add(metricName, new Min());
+ *
  * // as messages are sent we record the sizes
  * sensor.record(messageSize);
- * 
+ * }
*/ public final class MetricName { -- 1.9.3 (Apple Git-50) From d2380250903b5f927ff33fec4f65c403fad09dcb Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 6 Jan 2015 16:19:29 -0800 Subject: [PATCH 4/5] removed broker code for handling acks>1 and made NotEnoughReplicasAfterAppendException non-retriable --- .../kafka/common/errors/NotEnoughReplicasAfterAppendException.java | 2 +- core/src/main/scala/kafka/cluster/Partition.scala | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java index 75c80a9..23d53f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java @@ -21,7 +21,7 @@ package org.apache.kafka.common.errors; * This exception is raised when the low ISR size is discovered *after* the message * was already appended to the log. Producer retries will cause duplicates. */ -public class NotEnoughReplicasAfterAppendException extends RetriableException { +public class NotEnoughReplicasAfterAppendException extends ApiException { private static final long serialVersionUID = 1L; public NotEnoughReplicasAfterAppendException() { diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b230e9a..5a669bf 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -317,8 +317,6 @@ class Partition(val topic: String, } else { (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) } - } else if (requiredAcks > 0 && numAcks >= requiredAcks) { - (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) case None => -- 1.9.3 (Apple Git-50) From f14b9a3695e894b56a91dadd748e43f7852f2d5b Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 14 Jan 2015 15:41:00 -0800 Subject: [PATCH 5/5] added early handling of invalid number of acks to handler and a test --- .../kafka/clients/producer/KafkaProducer.java | 22 ++--- .../kafka/clients/tools/ProducerPerformance.java | 7 +- .../java/org/apache/kafka/common/MetricName.java | 24 ++--- .../errors/InvalidRequiredAcksException.java | 25 +++++ .../org/apache/kafka/common/protocol/Errors.java | 3 +- core/src/main/scala/kafka/server/KafkaApis.scala | 99 ++++++++++++-------- .../api/RequestResponseSerializationTest.scala | 2 +- .../test/scala/unit/kafka/api/testKafkaApis.scala | 104 +++++++++++++++++++++ kafka-patch-review.py | 31 ++---- 9 files changed, 222 insertions(+), 95 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java create mode 100644 core/src/test/scala/unit/kafka/api/testKafkaApis.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index fc71710..c79149a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -265,32 +265,32 @@ public class KafkaProducer implements Producer { *

* If you want to simulate a simple blocking call you can do the following: * - *

{@code
-     * producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
-     * }
+ *
+     *   producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
+     * 
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * - *

{@code
-     * ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
+     * 
+     *   ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
      *   producer.send(myRecord,
-     *                new Callback() {
+     *                 new Callback() {
      *                     public void onCompletion(RecordMetadata metadata, Exception e) {
      *                         if(e != null)
      *                             e.printStackTrace();
      *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
      *                     }
-     *                });
-     * }
+ * }); + *
* * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: * - *
{@code
-     * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
+     * 
+     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
-     * }
+ *
*

* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 689bae9..1b82800 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -15,7 +15,10 @@ package org.apache.kafka.clients.tools; import java.util.Arrays; import java.util.Properties; -import org.apache.kafka.clients.producer.*; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; public class ProducerPerformance { @@ -43,8 +46,6 @@ public class ProducerPerformance { throw new IllegalArgumentException("Invalid property: " + args[i]); props.put(pieces[0], pieces[1]); } - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer(props); /* setup perf test */ diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index 7e977e9..4e810d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -19,40 +19,34 @@ import org.apache.kafka.common.utils.Utils; /** * The MetricName class encapsulates a metric's name, logical group and its related attributes - *

+ *

* This class captures the following parameters *

  *  name The name of the metric
  *  group logical group name of the metrics to which this metric belongs.
  *  description A human-readable description to include in the metric. This is optional.
  *  tags additional key/value attributes of the metric. This is optional.
- * 
+ * * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting. - *

+ * * Ex: standard JMX MBean can be constructed like domainName:type=group,key1=val1,key2=val2 - *

+ * * Usage looks something like this: - *

{@code
+ * 
  * // set up metrics:
  * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- *
+ * Sensor sensor = metrics.sensor("message-sizes");
  * Map metricTags = new LinkedHashMap();
  * metricTags.put("client-id", "producer-1");
  * metricTags.put("topic", "topic");
- *
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
  * sensor.add(metricName, new Avg());
- *
- * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
+ * metricName = new MetricName("message-size-max", "producer-metrics",metricTags);
  * sensor.add(metricName, new Max());
  *
- * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
- * sensor.add(metricName, new Min());
- *
  * // as messages are sent we record the sizes
  * sensor.record(messageSize);
- * }
+ *
*/ public final class MetricName { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java new file mode 100644 index 0000000..2e0a3c6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class InvalidRequiredAcksException extends ApiException { + private static final long serialVersionUID = 1L; + + public InvalidRequiredAcksException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 3316b6a..03937cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -46,7 +46,8 @@ public enum Errors { INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), - NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); + NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")), + INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request contained request.required.acks with invalid value")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); static { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c011a1b..18116eb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -25,6 +25,8 @@ import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response import kafka.controller.KafkaController import kafka.utils.{SystemTime, Logging} +import org.apache.kafka.common.errors.InvalidRequiredAcksException +import org.apache.kafka.common.protocol.Errors import scala.collection._ @@ -157,53 +159,68 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - // the callback for sending the response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { - var errorInResponse = false - responseStatus.foreach { case (topicAndPartition, status) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { - debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" - .format(produceRequest.correlationId, produceRequest.clientId, - topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) - errorInResponse = true - } + if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) { + // If required.acks is outside accepted range, something is wrong with the client + // Just return an error and don't handle the request at all + + val responseStatus = produceRequest.data.map { + case(topicAndPartition, messageSet) => + (topicAndPartition -> + ProducerResponseStatus( Errors.forException(new InvalidRequiredAcksException("")).code(), + LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } else { + + // the callback for sending the response + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + var errorInResponse = false + responseStatus.foreach { case (topicAndPartition, status) => + // we only print warnings for known errors here; if it is unknown, it will cause + // an error message in the replica manager + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" + .format(produceRequest.correlationId, produceRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) + errorInResponse = true + } + } - if (produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" - .format(produceRequest.correlationId, produceRequest.clientId)) - requestChannel.closeConnection(request.processor, request) + if (produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" + .format(produceRequest.correlationId, produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } } else { - requestChannel.noOperation(request.processor, request) + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - } - - // only allow appending to internal topic partitions - // if the client is not from admin - val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId - // call the replica manager to append messages to the replicas - replicaManager.appendMessages( - produceRequest.ackTimeoutMs.toLong, - produceRequest.requiredAcks, - internalTopicsAllowed, - produceRequest.data, - sendResponseCallback) - - // if the request is put into the purgatory, it will have a held reference - // and hence cannot be garbage collected; hence we clear its data here in - // order to let GC re-claim its memory since it is already appended to log - produceRequest.emptyData() + // only allow appending to internal topic partitions + // if the client is not from admin + val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId + + // call the replica manager to append messages to the replicas + replicaManager.appendMessages( + produceRequest.ackTimeoutMs.toLong, + produceRequest.requiredAcks, + internalTopicsAllowed, + produceRequest.data, + sendResponseCallback) + + // if the request is put into the purgatory, it will have a held reference + // and hence cannot be garbage collected; hence we clear its data here in + // order to let GC re-claim its memory since it is already appended to log + produceRequest.emptyData() + } } /** diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..8d0fa53 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -59,7 +59,7 @@ object SerializationTestUtils { private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes)) private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1, partitionDataMessage2, partitionDataMessage3) - private val topicDataProducerRequest = { + val topicDataProducerRequest = { val groupedData = Array(topic1, topic2).flatMap(topic => partitionDataProducerRequestArray.zipWithIndex.map { diff --git a/core/src/test/scala/unit/kafka/api/testKafkaApis.scala b/core/src/test/scala/unit/kafka/api/testKafkaApis.scala new file mode 100644 index 0000000..75b3c2f --- /dev/null +++ b/core/src/test/scala/unit/kafka/api/testKafkaApis.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.ByteBuffer + +import kafka.network.{RequestChannel, SocketServer} +import org.apache.kafka.common.errors.InvalidRequiredAcksException +import org.apache.kafka.common.protocol.Errors +import org.junit.{After, Test} +import kafka.server.KafkaApis +import org.scalatest.junit.JUnitSuite + +import scala.collection.Map + +class testKafkaApis extends JUnitSuite{ + + val server: SocketServer = new SocketServer(0, + host = null, + port = kafka.utils.TestUtils.choosePort, + numProcessorThreads = 1, + maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, + maxRequestSize = 5000, + maxConnectionsPerIp = 5, + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = Map.empty[String,Int]) + server.startup() + + def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { + val outgoing = new DataOutputStream(socket.getOutputStream) + outgoing.writeInt(request.length + 2) + outgoing.writeShort(id) + outgoing.write(request) + outgoing.flush() + } + + def receiveResponse(socket: Socket): Array[Byte] = { + val incoming = new DataInputStream(socket.getInputStream) + val len = incoming.readInt() + val response = new Array[Byte](len) + incoming.readFully(response) + response + } + + def processProduceRequest(channel: RequestChannel) { + val request = channel.receiveRequest + + val apis = new KafkaApis(server.requestChannel, null, null, null, 0, null, null) + + apis.handleProducerRequest(request) + } + + def connect(s:SocketServer = server) = new Socket("localhost", s.port) + + @After + def cleanup() { + server.shutdown() + } + + @Test + def testProduceRequestRequiredAcks() { + + val socket = connect() + + // creating request with illegal number of required.acks + + val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) + + val byteBuffer = ByteBuffer.allocate(produceRequest.sizeInBytes) + produceRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(socket, 0, serializedBytes) + + processProduceRequest(server.requestChannel) + + val response = ProducerResponse.readFrom(ByteBuffer.wrap(receiveResponse(socket))) + + assert(response.status.values.head.error == + Errors.forException(new InvalidRequiredAcksException("")).code()) + } + +} diff --git a/kafka-patch-review.py b/kafka-patch-review.py index 9592680..b7f132f 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -7,31 +7,22 @@ import time import datetime import tempfile import commands -import getpass from jira.client import JIRA def get_jira_config(): # read the config file home=jira_home=os.getenv('HOME') home=home.rstrip('/') - if not (os.path.isfile(home + '/jira.ini')): - jira_user=raw_input('JIRA user :') - jira_pass=getpass.getpass('JIRA password :') - jira_config = {'user':jira_user, 'password':jira_pass} - return jira_config - else: - jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) - return jira_config + jira_config = dict(line.strip().split('=') for line in open(home + '/jira.ini')) + return jira_config -def get_jira(jira_config): +def get_jira(): options = { 'server': 'https://issues.apache.org/jira' } + + jira_config = get_jira_config() jira = JIRA(options=options,basic_auth=(jira_config['user'], jira_config['password'])) - # (Force) verify the auth was really done - jira_session=jira.session() - if (jira_session is None): - raise Exception("Failed to login to the JIRA instance") return jira def cmd_exists(cmd): @@ -90,15 +81,6 @@ def main(): p=os.popen(git_remote_update) p.close() - # Get JIRA configuration and login to JIRA to ensure the credentials work, before publishing the patch to the review board - print "Verifying JIRA connection configurations" - try: - jira_config=get_jira_config() - jira=get_jira(jira_config) - except: - print "Failed to login to the JIRA instance", sys.exc_info()[0], sys.exc_info()[1] - sys.exit(1) - rb_command= post_review_tool + " --publish --tracking-branch " + opt.branch + " --target-groups=kafka --bugs-closed=" + opt.jira if opt.debug: rb_command=rb_command + " --debug" @@ -141,6 +123,7 @@ def main(): p.close() print 'Creating diff against', opt.branch, 'and uploading patch to JIRA',opt.jira + jira=get_jira() issue = jira.issue(opt.jira) attachment=open(patch_file) jira.add_attachment(issue,attachment) @@ -163,6 +146,8 @@ def main(): for t in transitions: transitionsMap[t['name']] = t['id'] + jira_config = get_jira_config() + if('Submit Patch' in transitionsMap): jira.transition_issue(issue, transitionsMap['Submit Patch'] , assignee={'name': jira_config['user']} ) -- 1.9.3 (Apple Git-50)