diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index fc9e084..4b0824f 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -60,7 +60,7 @@ object ClientUtils extends Logging{ catch { case e: Throwable => warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" - .format(correlationId, topics, shuffledBrokers(i).toString), e) + .format(correlationId, topics, shuffledBrokers(i).toString)) t = e } finally { i = i + 1 diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..c95efa6 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -85,7 +85,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, if (!isRunning.get()) throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else - warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) + warn("Failed to find leader for %s due to %s".format(noLeaderPartitionSet, t.toString)) } } finally { lock.unlock() @@ -101,7 +101,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, if (!isRunning.get()) throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else { - warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) + warn("Failed to add leader for partitions %s due to %s; will retry".format(leaderForPartitionsMap.keySet.mkString(","), t.toString)) lock.lock() noLeaderPartitionSet ++= leaderForPartitionsMap.keySet lock.unlock() diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 1dde4fc..ff2780a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -435,7 +435,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } catch { case e: Exception => - warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetsChannel.host, offsetsChannel.port, e.getMessage)) + warn("Error while fetching offsets from %s:%d due to %s".format(offsetsChannel.host, offsetsChannel.port, e.toString)) offsetsChannel.disconnect() None // retry } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index c95c650..ee86ea2 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int, } } catch { case e: Throwable => - warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) + warn("Controller %d fails to send a request to broker %s due to %s".format(controllerId, toBroker.toString(), e.toString)) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 933de9d..f96db26 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -148,7 +148,7 @@ object KafkaController extends Logging { try { return controllerInfoString.toInt } catch { - case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) + case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.") } } } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 4976d9c..90295fb 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -192,7 +192,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) } catch { case e: SocketException => - throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) + throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage)) } serverChannel } @@ -275,7 +275,7 @@ private[kafka] class Processor(val id: Int, info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) close(key) } case e: Throwable => { - error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) + error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error " + e.toString) close(key) } } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481..c0328dd 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -110,7 +110,7 @@ class Producer[K,V](val config: ProducerConfig, producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) - }else { + } else { trace("Added to send queue an event: " + message.toString) trace("Remaining queue size: " + queue.remainingCapacity) } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 489f007..83ea774 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -129,7 +129,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { info("Disconnecting from " + config.host + ":" + config.port) blockingChannel.disconnect() } catch { - case e: Exception => error("Error on disconnect: ", e) + case e: Exception => error("Error on disconnect: " + e.toString) } } @@ -141,7 +141,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } catch { case e: Exception => { disconnect() - error("Producer connection to " + config.host + ":" + config.port + " unsuccessful", e) + error("Producer connection to " + config.host + ":" + config.port + " unsuccessful due to " + e.toString) throw e } } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..6f4f500 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -64,7 +64,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig, topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { - Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) + try { + brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement) + } catch { + case e: Throwable => warn("Refresh metadata failed due to " + e.toString) + } sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds @@ -75,7 +79,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those - Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) + try { + brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement) + } catch { + case e: Throwable => warn("Update metadata for outstanding produce requests failed due to " + e.toString) + } sendPartitionPerTopicCache.clear() remainingRetries -= 1 producerStats.resendRate.mark() @@ -112,7 +120,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, }) } } catch { - case t: Throwable => error("Failed to send messages", t) + case t: Throwable => error("Failed to send messages: " + t.toString) } failedProduceRequests case None => // all produce requests failed @@ -175,9 +183,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, dataPerTopicPartition.append(message) } Some(ret) - }catch { // Swallow recoverable exceptions and return None so that they can be retried. - case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None - case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None + } catch { // Swallow recoverable exceptions and return None so that they can be retried. + case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic, partition due to: " + ute.getMessage); None + case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic, partition due to: " + lnae.getMessage); None case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } } @@ -255,7 +263,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - if(response != null) { + if (response != null) { if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) if (logger.isTraceEnabled) { @@ -281,8 +289,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } catch { case t: Throwable => - warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s" - .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t) + warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s due to %s" + .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(","), t.toString)) messagesPerTopic.keys.toSeq } } else { diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 42e9c74..740947c 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -42,9 +42,9 @@ class ProducerSendThread[K,V](val threadName: String, override def run { try { processEvents - }catch { + } catch { case e: Throwable => error("Error in sending events: ", e) - }finally { + } finally { shutdownLatch.countDown } } @@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String, if(size > 0) handler.handle(events) }catch { - case e: Throwable => error("Error in handling batch of " + size + " events", e) + case e: Throwable => error("Error in handling batch of " + size + " events due to " + e.toString) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3b15254..6e3dc63 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -97,7 +97,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } catch { case t: Throwable => if (isRunning.get) { - warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage)) + warn("Error in fetch request %s: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d96229e..5865332 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -22,20 +22,22 @@ import kafka.api._ import kafka.message._ import kafka.network._ import kafka.log._ -import scala.collection._ -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup import kafka.common._ import kafka.utils.{Pool, SystemTime, Logging} -import kafka.network.RequestChannel.Response import kafka.cluster.Broker import kafka.controller.KafkaController import kafka.utils.Utils.inLock -import org.I0Itec.zkclient.ZkClient -import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.network.RequestChannel.Response import kafka.controller.KafkaController.StateChangeLogger +import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic._ + +import scala.collection._ +import org.I0Itec.zkclient.ZkClient + /** * Logic to handle the various Kafka requests */ @@ -424,6 +426,12 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) new ProduceResult(topicAndPartition, nle) + case stle: MessageSizeTooLargeException => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + error("Error processing ProducerRequest with correlation id %d from client %s on partition %s: %s" + .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition, stle.getMessage)) + new ProduceResult(topicAndPartition, stle) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 6bfbac1..62a2fa7 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -168,7 +168,7 @@ object Utils extends Logging { case e: Throwable => log(e.getMessage(), e) } } - + /** * Test if two byte buffers are equal. In this case equality means having * the same bytes from the current position to the limit