diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 6baf4c5..4e087bb 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -67,12 +67,15 @@ class SimpleConsumer(val host: String, } } + @throws[IOException] def close() { lock synchronized { disconnect() } } + @throws[IOException] + @throws[InterruptedException] private def sendRequest(request: RequestOrResponse): Receive = { lock synchronized { var response: Receive = null @@ -100,11 +103,15 @@ class SimpleConsumer(val host: String, } } + @throws[IOException] + @throws[InterruptedException] def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = sendRequest(request) TopicMetadataResponse.readFrom(response.buffer) } + @throws[IOException] + @throws[InterruptedException] def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = { val response = sendRequest(request) ConsumerMetadataResponse.readFrom(response.buffer) @@ -116,6 +123,8 @@ class SimpleConsumer(val host: String, * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ + @throws[IOException] + @throws[InterruptedException] def fetch(request: FetchRequest): FetchResponse = { var response: Receive = null val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer @@ -137,6 +146,8 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. */ + @throws[IOException] + @throws[InterruptedException] def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) /** @@ -145,6 +156,8 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetCommitRequest]] object. * @return a [[kafka.api.OffsetCommitResponse]] object. */ + @throws[IOException] + @throws[InterruptedException] def commitOffsets(request: OffsetCommitRequest) = { // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before // we can commit offsets. @@ -157,6 +170,8 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetFetchRequest]] object. * @return a [[kafka.api.OffsetFetchResponse]] object. */ + @throws[IOException] + @throws[InterruptedException] def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) /** @@ -166,6 +181,8 @@ class SimpleConsumer(val host: String, * @param consumerId Id of the consumer which could be a consumer client, SimpleConsumerShell or a follower broker. * @return Requested offset. */ + @throws[IOException] + @throws[InterruptedException] def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = { val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), clientId = clientId, -- libgit2 0.21.2