From 9ebbe73d25a00ba5123b9c0c313dce1a02ef9092 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 9 Jul 2015 09:45:59 +0100 Subject: [PATCH 1/3] kafka-2322; Use diamond operator to reduce redundancy --- .../java/org/apache/kafka/clients/ClientUtils.java | 4 ++-- .../kafka/clients/ClusterConnectionStates.java | 4 ++-- .../org/apache/kafka/clients/InFlightRequests.java | 6 +++--- .../main/java/org/apache/kafka/clients/Metadata.java | 4 ++-- .../java/org/apache/kafka/clients/NetworkClient.java | 8 ++++---- .../kafka/clients/consumer/ConsumerConfig.java | 2 +- .../kafka/clients/consumer/ConsumerRecords.java | 8 ++++---- .../apache/kafka/clients/consumer/KafkaConsumer.java | 12 ++++++------ .../apache/kafka/clients/consumer/MockConsumer.java | 10 +++++----- .../clients/consumer/internals/Coordinator.java | 12 ++++++------ .../kafka/clients/consumer/internals/Fetcher.java | 20 ++++++++++---------- .../clients/consumer/internals/RequestFuture.java | 2 +- .../consumer/internals/SubscriptionState.java | 18 +++++++++--------- .../apache/kafka/clients/producer/KafkaProducer.java | 4 ++-- .../apache/kafka/clients/producer/MockProducer.java | 8 ++++---- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../kafka/clients/producer/internals/BufferPool.java | 6 +++--- .../producer/internals/RecordAccumulator.java | 12 ++++++------ .../clients/producer/internals/RecordBatch.java | 4 ++-- .../kafka/clients/producer/internals/Sender.java | 10 +++++----- .../kafka/clients/tools/ProducerPerformance.java | 4 ++-- .../main/java/org/apache/kafka/common/Cluster.java | 20 ++++++++++---------- .../java/org/apache/kafka/common/MetricName.java | 4 ++-- .../apache/kafka/common/config/AbstractConfig.java | 6 +++--- .../org/apache/kafka/common/config/ConfigDef.java | 8 ++++---- .../org/apache/kafka/common/metrics/JmxReporter.java | 4 ++-- .../org/apache/kafka/common/metrics/Metrics.java | 6 +++--- .../java/org/apache/kafka/common/metrics/Sensor.java | 4 ++-- .../kafka/common/metrics/stats/Percentiles.java | 2 +- .../kafka/common/metrics/stats/SampledStat.java | 2 +- .../org/apache/kafka/common/network/Selector.java | 18 +++++++++--------- .../org/apache/kafka/common/protocol/Errors.java | 4 ++-- .../kafka/common/protocol/SecurityProtocol.java | 4 ++-- .../apache/kafka/common/protocol/types/Schema.java | 4 ++-- .../apache/kafka/common/requests/FetchRequest.java | 8 ++++---- .../apache/kafka/common/requests/FetchResponse.java | 6 +++--- .../kafka/common/requests/JoinGroupRequest.java | 2 +- .../kafka/common/requests/JoinGroupResponse.java | 6 +++--- .../kafka/common/requests/ListOffsetRequest.java | 8 ++++---- .../kafka/common/requests/ListOffsetResponse.java | 8 ++++---- .../kafka/common/requests/MetadataRequest.java | 4 ++-- .../kafka/common/requests/MetadataResponse.java | 20 ++++++++++---------- .../kafka/common/requests/OffsetCommitRequest.java | 8 ++++---- .../kafka/common/requests/OffsetCommitResponse.java | 6 +++--- .../kafka/common/requests/OffsetFetchRequest.java | 8 ++++---- .../kafka/common/requests/OffsetFetchResponse.java | 6 +++--- .../apache/kafka/common/requests/ProduceRequest.java | 8 ++++---- .../kafka/common/requests/ProduceResponse.java | 6 +++--- .../apache/kafka/common/utils/CollectionUtils.java | 8 ++++---- .../apache/kafka/common/utils/CopyOnWriteMap.java | 6 +++--- .../java/org/apache/kafka/clients/MockClient.java | 10 +++++----- .../kafka/clients/consumer/KafkaConsumerTest.java | 2 +- .../kafka/clients/consumer/MockConsumerTest.java | 6 +++--- .../clients/consumer/internals/CoordinatorTest.java | 2 +- .../clients/consumer/internals/FetcherTest.java | 4 ++-- .../kafka/clients/producer/KafkaProducerTest.java | 4 ++-- .../kafka/clients/producer/MockProducerTest.java | 14 +++++++------- .../kafka/clients/producer/ProducerRecordTest.java | 14 +++++++------- .../clients/producer/internals/BufferPoolTest.java | 4 ++-- .../producer/internals/RecordAccumulatorTest.java | 4 ++-- .../kafka/clients/producer/internals/SenderTest.java | 2 +- .../apache/kafka/common/config/ConfigDefTest.java | 6 +++--- .../org/apache/kafka/common/metrics/MetricsTest.java | 2 +- .../apache/kafka/common/network/SelectorTest.java | 4 ++-- .../kafka/common/record/MemoryRecordsTest.java | 2 +- .../org/apache/kafka/common/record/RecordTest.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 20 ++++++++++---------- .../common/serialization/SerializationTest.java | 8 ++++---- .../kafka/common/utils/AbstractIteratorTest.java | 6 +++--- .../java/org/apache/kafka/test/Microbenchmarks.java | 10 +++++----- .../java/org/apache/kafka/test/MockSelector.java | 10 +++++----- .../test/java/org/apache/kafka/test/TestUtils.java | 2 +- .../src/main/java/kafka/etl/KafkaETLContext.java | 3 +-- .../main/java/kafka/etl/KafkaETLRecordReader.java | 2 +- .../src/main/java/kafka/etl/KafkaETLUtils.java | 2 +- .../src/main/java/kafka/etl/Props.java | 4 ++-- .../src/main/java/kafka/etl/impl/DataGenerator.java | 2 +- .../java/kafka/bridge/hadoop/KafkaOutputFormat.java | 6 +++--- .../java/kafka/bridge/hadoop/KafkaRecordWriter.java | 2 +- .../main/scala/kafka/tools/KafkaMigrationTool.java | 8 ++++---- examples/src/main/java/kafka/examples/Consumer.java | 2 +- examples/src/main/java/kafka/examples/Producer.java | 6 +++--- .../main/java/kafka/examples/SimpleConsumerDemo.java | 2 +- .../kafka/log4jappender/KafkaLog4jAppender.java | 2 +- .../kafka/log4jappender/MockKafkaLog4jAppender.java | 2 +- 85 files changed, 272 insertions(+), 273 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 0d68bf1..b11d297 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -29,7 +29,7 @@ public class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); public static List parseAndValidateAddresses(List urls) { - List addresses = new ArrayList(); + List addresses = new ArrayList<>(); for (String url : urls) { if (url != null && url.length() > 0) { String host = getHost(url); @@ -61,4 +61,4 @@ public class ClientUtils { } } } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 9ebda5e..6fb2090 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -25,7 +25,7 @@ final class ClusterConnectionStates { public ClusterConnectionStates(long reconnectBackoffMs) { this.reconnectBackoffMs = reconnectBackoffMs; - this.nodeState = new HashMap(); + this.nodeState = new HashMap<>(); } /** @@ -158,4 +158,4 @@ final class ClusterConnectionStates { return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; } } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 15d00d4..0fefed6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -24,7 +24,7 @@ import java.util.Map; final class InFlightRequests { private final int maxInFlightRequestsPerConnection; - private final Map> requests = new HashMap>(); + private final Map> requests = new HashMap<>(); public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; @@ -36,7 +36,7 @@ final class InFlightRequests { public void add(ClientRequest request) { Deque reqs = this.requests.get(request.request().destination()); if (reqs == null) { - reqs = new ArrayDeque(); + reqs = new ArrayDeque<>(); this.requests.put(request.request().destination(), reqs); } reqs.addFirst(request); @@ -123,4 +123,4 @@ final class InFlightRequests { } } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 0387f26..5fe4a30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -62,7 +62,7 @@ public final class Metadata { this.version = 0; this.cluster = Cluster.empty(); this.needUpdate = false; - this.topics = new HashSet(); + this.topics = new HashSet<>(); } /** @@ -130,7 +130,7 @@ public final class Metadata { * Get the list of topics we are currently maintaining metadata for */ public synchronized Set topics() { - return new HashSet(this.topics); + return new HashSet<>(this.topics); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 48fe796..7bb1d0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -224,7 +224,7 @@ public class NetworkClient implements KafkaClient { } // process completed actions - List responses = new ArrayList(); + List responses = new ArrayList<>(); handleCompletedSends(responses, now); handleCompletedReceives(responses, now); handleDisconnections(responses, now); @@ -256,7 +256,7 @@ public class NetworkClient implements KafkaClient { try { this.selector.muteAll(); this.selector.unmute(node); - List responses = new ArrayList(); + List responses = new ArrayList<>(); while (inFlightRequestCount(node) > 0) responses.addAll(poll(Integer.MAX_VALUE, now)); return responses; @@ -270,7 +270,7 @@ public class NetworkClient implements KafkaClient { */ @Override public List completeAll(long now) { - List responses = new ArrayList(); + List responses = new ArrayList<>(); while (inFlightRequestCount() > 0) responses.addAll(poll(Integer.MAX_VALUE, now)); return responses; @@ -452,7 +452,7 @@ public class NetworkClient implements KafkaClient { * Create a metadata request for the given topics */ private ClientRequest metadataRequest(long now, String node, Set topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); + MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics)); RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); return new ClientRequest(now, true, send, null); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index daff34d..ae77d8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -292,7 +292,7 @@ public class ConsumerConfig extends AbstractConfig { public static Map addDeserializerToConfig(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { - Map newConfigs = new HashMap(); + Map newConfigs = new HashMap<>(); newConfigs.putAll(configs); if (keyDeserializer != null) newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index eb75d2e..14cb176 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -28,7 +28,7 @@ import java.util.Map; */ public class ConsumerRecords implements Iterable> { public static final ConsumerRecords EMPTY = - new ConsumerRecords(Collections.EMPTY_MAP); + new ConsumerRecords<>(Collections.EMPTY_MAP); private final Map>> records; @@ -55,17 +55,17 @@ public class ConsumerRecords implements Iterable> { public Iterable> records(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); - List>> recs = new ArrayList>>(); + List>> recs = new ArrayList<>(); for (Map.Entry>> entry : records.entrySet()) { if (entry.getKey().topic().equals(topic)) recs.add(entry.getValue()); } - return new ConcatenatedIterable(recs); + return new ConcatenatedIterable<>(recs); } @Override public Iterator> iterator() { - return new ConcatenatedIterable(records.values()).iterator(); + return new ConcatenatedIterable<>(records.values()).iterator(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 7aa0760..c5efe82 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -417,7 +417,7 @@ public class KafkaConsumer implements Consumer { // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access - private final AtomicReference currentThread = new AtomicReference(); + private final AtomicReference currentThread = new AtomicReference<>(); // refcount is used to allow reentrant access by the thread who has acquired currentThread private final AtomicInteger refcount = new AtomicInteger(0); @@ -531,7 +531,7 @@ public class KafkaConsumer implements Consumer { this.metadata.update(Cluster.bootstrap(addresses), 0); String metricGrpPrefix = "consumer"; - Map metricsTags = new LinkedHashMap(); + Map metricsTags = new LinkedHashMap<>(); metricsTags.put("client-id", clientId); this.client = new NetworkClient( new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags), @@ -567,7 +567,7 @@ public class KafkaConsumer implements Consumer { } else { this.valueDeserializer = valueDeserializer; } - this.fetcher = new Fetcher(this.client, + this.fetcher = new Fetcher<>(this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), @@ -732,7 +732,7 @@ public class KafkaConsumer implements Consumer { // handling the fetched records. fetcher.initFetches(metadata.fetch(), end); pollClient(0, end); - return new ConsumerRecords(records); + return new ConsumerRecords<>(records); } remaining -= end - start; @@ -835,7 +835,7 @@ public class KafkaConsumer implements Consumer { acquire(); try { // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance) - Map allConsumed = new HashMap(this.subscriptions.allConsumed()); + Map allConsumed = new HashMap<>(this.subscriptions.allConsumed()); commit(allConsumed, commitType); } finally { release(); @@ -1003,7 +1003,7 @@ public class KafkaConsumer implements Consumer { private void close(boolean swallowException) { log.trace("Closing the Kafka consumer."); - AtomicReference firstException = new AtomicReference(); + AtomicReference firstException = new AtomicReference<>(); this.closed = true; ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); ClientUtils.closeQuietly(client, "consumer network client", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 46e26a6..851aabc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -42,8 +42,8 @@ public class MockConsumer implements Consumer { public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.subscriptions = new SubscriptionState(offsetResetStrategy); - this.partitions = new HashMap>(); - this.records = new HashMap>>(); + this.partitions = new HashMap<>(); + this.records = new HashMap<>(); this.closed = false; } @@ -88,8 +88,8 @@ public class MockConsumer implements Consumer { this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); } - ConsumerRecords copy = new ConsumerRecords(this.records); - this.records = new HashMap>>(); + ConsumerRecords copy = new ConsumerRecords<>(this.records); + this.records = new HashMap<>(); return copy; } @@ -99,7 +99,7 @@ public class MockConsumer implements Consumer { this.subscriptions.assignedPartitions().add(tp); List> recs = this.records.get(tp); if (recs == null) { - recs = new ArrayList>(); + recs = new ArrayList<>(); this.records.put(tp, recs); } recs.add(record); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index c1c8172..8afe1f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -113,7 +113,7 @@ public final class Coordinator { if (future.isDone()) return future; // send a join group request to the coordinator - List subscribedTopics = new ArrayList(subscriptions.subscribedTopics()); + List subscribedTopics = new ArrayList<>(subscriptions.subscribedTopics()); log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics); JoinGroupRequest request = new JoinGroupRequest(groupId, @@ -208,7 +208,7 @@ public final class Coordinator { } else { // create the offset commit request Map offsetData; - offsetData = new HashMap(offsets.size()); + offsetData = new HashMap<>(offsets.size()); for (Map.Entry entry : offsets.entrySet()) offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), "")); OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, @@ -230,7 +230,7 @@ public final class Coordinator { if (client.ready(this.consumerCoordinator, now)) // We have an open connection and we're ready to send - return new RequestFuture(); + return new RequestFuture<>(); if (this.client.connectionFailed(this.consumerCoordinator)) { coordinatorDead(); @@ -255,7 +255,7 @@ public final class Coordinator { log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", ")); // construct the request - OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList(partitions)); + OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions)); // send the request with a callback RequestCompletionHandler completionHandler = new RequestCompletionHandler() { @@ -275,7 +275,7 @@ public final class Coordinator { } else { // parse the response to get the offsets OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody()); - Map offsets = new HashMap(response.responseData().size()); + Map offsets = new HashMap<>(response.responseData().size()); for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetFetchResponse.PartitionData data = entry.getValue(); @@ -369,7 +369,7 @@ public final class Coordinator { return RequestFuture.pollNeeded(); } } else { - final RequestFuture future = new RequestFuture(); + final RequestFuture future = new RequestFuture<>(); // create a consumer metadata request log.debug("Issuing consumer metadata request to broker {}", node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 695eaf6..5446083 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -102,7 +102,7 @@ public class Fetcher { this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; - this.records = new LinkedList>(); + this.records = new LinkedList<>(); this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags); } @@ -132,7 +132,7 @@ public class Fetcher { if (this.subscriptions.partitionAssignmentNeeded()) { return Collections.emptyMap(); } else { - Map>> drained = new HashMap>>(); + Map>> drained = new HashMap<>(); for (PartitionRecords part : this.records) { Long consumed = subscriptions.consumed(part.partition); if (this.subscriptions.assignedPartitions().contains(part.partition) @@ -164,7 +164,7 @@ public class Fetcher { * @return A response which can be polled to obtain the corresponding offset. */ public RequestFuture listOffset(final TopicPartition topicPartition, long timestamp) { - Map partitions = new HashMap(1); + Map partitions = new HashMap<>(1); partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1)); long now = time.milliseconds(); PartitionInfo info = metadata.fetch().partition(topicPartition); @@ -176,7 +176,7 @@ public class Fetcher { log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition); return RequestFuture.metadataRefreshNeeded(); } else if (this.client.ready(info.leader(), now)) { - final RequestFuture future = new RequestFuture(); + final RequestFuture future = new RequestFuture<>(); Node node = info.leader(); ListOffsetRequest request = new ListOffsetRequest(-1, partitions); RequestSend send = new RequestSend(node.idString(), @@ -237,7 +237,7 @@ public class Fetcher { */ private List createFetchRequests(Cluster cluster) { // create the fetch info - Map> fetchable = new HashMap>(); + Map> fetchable = new HashMap<>(); for (TopicPartition partition : subscriptions.assignedPartitions()) { Node node = cluster.leaderFor(partition); if (node == null) { @@ -246,7 +246,7 @@ public class Fetcher { // if there is a leader and no in-flight requests, issue a new fetch Map fetch = fetchable.get(node.id()); if (fetch == null) { - fetch = new HashMap(); + fetch = new HashMap<>(); fetchable.put(node.id(), fetch); } long offset = this.subscriptions.fetched(partition); @@ -255,7 +255,7 @@ public class Fetcher { } // create the requests - List requests = new ArrayList(fetchable.size()); + List requests = new ArrayList<>(fetchable.size()); for (Map.Entry> entry : fetchable.entrySet()) { int nodeId = entry.getKey(); final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue()); @@ -292,7 +292,7 @@ public class Fetcher { ByteBuffer buffer = partition.recordSet; MemoryRecords records = MemoryRecords.readableRecords(buffer); long fetchOffset = request.fetchData().get(tp).offset; - List> parsed = new ArrayList>(); + List> parsed = new ArrayList<>(); for (LogEntry logEntry : records) { parsed.add(parseRecord(tp, logEntry)); bytes += logEntry.size(); @@ -300,7 +300,7 @@ public class Fetcher { if (parsed.size() > 0) { ConsumerRecord record = parsed.get(parsed.size() - 1); this.subscriptions.fetched(tp, record.offset() + 1); - this.records.add(new PartitionRecords(fetchOffset, tp, parsed)); + this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed)); this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); } this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size()); @@ -338,7 +338,7 @@ public class Fetcher { ByteBuffer valueBytes = logEntry.record().value(); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes)); - return new ConsumerRecord(partition.topic(), partition.partition(), offset, key, value); + return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value); } private static class PartitionRecords { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java index 13fc9af..9c0188a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java @@ -186,7 +186,7 @@ public class RequestFuture { } private static RequestFuture newRetryFuture(RetryAction retryAction) { - RequestFuture result = new RequestFuture(); + RequestFuture result = new RequestFuture<>(); result.retry(retryAction); return result; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 6837453..6d823ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -58,15 +58,15 @@ public class SubscriptionState { public SubscriptionState(OffsetResetStrategy offsetResetStrategy) { this.offsetResetStrategy = offsetResetStrategy; - this.subscribedTopics = new HashSet(); - this.subscribedPartitions = new HashSet(); - this.assignedPartitions = new HashSet(); - this.consumed = new HashMap(); - this.fetched = new HashMap(); - this.committed = new HashMap(); + this.subscribedTopics = new HashSet<>(); + this.subscribedPartitions = new HashSet<>(); + this.assignedPartitions = new HashSet<>(); + this.consumed = new HashMap<>(); + this.fetched = new HashMap<>(); + this.committed = new HashMap<>(); this.needsPartitionAssignment = false; this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up - this.resetPartitions = new HashMap(); + this.resetPartitions = new HashMap<>(); } public void subscribe(String topic) { @@ -208,7 +208,7 @@ public class SubscriptionState { } public Set missingFetchPositions() { - Set copy = new HashSet(this.assignedPartitions); + Set copy = new HashSet<>(this.assignedPartitions); copy.removeAll(this.fetched.keySet()); return copy; } @@ -227,4 +227,4 @@ public class SubscriptionState { } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 03b8dd2..746d4a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -213,7 +213,7 @@ public class KafkaProducer implements Producer { this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - Map metricTags = new LinkedHashMap(); + Map metricTags = new LinkedHashMap<>(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, @@ -562,7 +562,7 @@ public class KafkaProducer implements Producer { log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); // this will keep track of the first encountered exception - AtomicReference firstException = new AtomicReference(); + AtomicReference firstException = new AtomicReference<>(); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; if (timeout > 0) { if (invokedFromCallback) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 36e7ffa..c961509 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -68,9 +68,9 @@ public class MockProducer implements Producer { this.partitioner = partitioner; this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; - this.offsets = new HashMap(); - this.sent = new ArrayList>(); - this.completions = new ArrayDeque(); + this.offsets = new HashMap<>(); + this.sent = new ArrayList<>(); + this.completions = new ArrayDeque<>(); } /** @@ -164,7 +164,7 @@ public class MockProducer implements Producer { * Get the list of sent records since the last call to {@link #clear()} */ public synchronized List> history() { - return new ArrayList>(this.sent); + return new ArrayList<>(this.sent); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index aa26420..621eb53 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -232,7 +232,7 @@ public class ProducerConfig extends AbstractConfig { public static Map addSerializerToConfig(Map configs, Serializer keySerializer, Serializer valueSerializer) { - Map newConfigs = new HashMap(); + Map newConfigs = new HashMap<>(); newConfigs.putAll(configs); if (keySerializer != null) newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 4cb1e50..65c1226 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -72,8 +72,8 @@ public final class BufferPool { this.poolableSize = poolableSize; this.blockOnExhaustion = blockOnExhaustion; this.lock = new ReentrantLock(); - this.free = new ArrayDeque(); - this.waiters = new ArrayDeque(); + this.free = new ArrayDeque<>(); + this.waiters = new ArrayDeque<>(); this.totalMemory = memory; this.availableMemory = memory; this.metrics = metrics; @@ -267,4 +267,4 @@ public final class BufferPool { public long totalMemory() { return this.totalMemory; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a152bd7..726d6c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -105,7 +105,7 @@ public final class RecordAccumulator { this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; - this.batches = new CopyOnWriteMap>(); + this.batches = new CopyOnWriteMap<>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); @@ -230,7 +230,7 @@ public final class RecordAccumulator { * */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { - Set readyNodes = new HashSet(); + Set readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; @@ -297,11 +297,11 @@ public final class RecordAccumulator { if (nodes.isEmpty()) return Collections.emptyMap(); - Map> batches = new HashMap>(); + Map> batches = new HashMap<>(); for (Node node : nodes) { int size = 0; List parts = cluster.partitionsForNode(node.id()); - List ready = new ArrayList(); + List ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); do { @@ -465,7 +465,7 @@ public final class RecordAccumulator { private final Set incomplete; public IncompleteRecordBatches() { - this.incomplete = new HashSet(); + this.incomplete = new HashSet<>(); } public void add(RecordBatch batch) { @@ -484,7 +484,7 @@ public final class RecordAccumulator { public Iterable all() { synchronized (incomplete) { - return new ArrayList(this.incomplete); + return new ArrayList<>(this.incomplete); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 06182db..bc823ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -49,7 +49,7 @@ public final class RecordBatch { this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); - this.thunks = new ArrayList(); + this.thunks = new ArrayList<>(); } /** @@ -116,4 +116,4 @@ public final class RecordBatch { public String toString() { return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")"; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 0baf16e..5e9a12f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -302,7 +302,7 @@ public class Sender implements Runnable { * Transfer the record batches into a list of produce requests on a per-node basis */ private List createProduceRequests(Map> collated, long now) { - List requests = new ArrayList(collated.size()); + List requests = new ArrayList<>(collated.size()); for (Map.Entry> entry : collated.entrySet()) requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; @@ -312,8 +312,8 @@ public class Sender implements Runnable { * Create a produce request from the given record batches */ private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List batches) { - Map produceRecordsByPartition = new HashMap(batches.size()); - final Map recordsByPartition = new HashMap(batches.size()); + Map produceRecordsByPartition = new HashMap<>(batches.size()); + final Map recordsByPartition = new HashMap<>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, (ByteBuffer) batch.records.buffer().flip()); @@ -355,7 +355,7 @@ public class Sender implements Runnable { public SenderMetrics(Metrics metrics) { this.metrics = metrics; - Map metricTags = new LinkedHashMap(); + Map metricTags = new LinkedHashMap<>(); metricTags.put("client-id", clientId); String metricGrpName = "producer-metrics"; @@ -421,7 +421,7 @@ public class Sender implements Runnable { String topicRecordsCountName = "topic." + topic + ".records-per-batch"; Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); if (topicRecordCount == null) { - Map metricTags = new LinkedHashMap(); + Map metricTags = new LinkedHashMap<>(); metricTags.put("client-id", clientId); metricTags.put("topic", topic); String metricGrpName = "producer-topic-metrics"; diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 13f4d59..703281d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -45,12 +45,12 @@ public class ProducerPerformance { } props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - KafkaProducer producer = new KafkaProducer(props); + KafkaProducer producer = new KafkaProducer<>(props); /* setup perf test */ byte[] payload = new byte[recordSize]; Arrays.fill(payload, (byte) 1); - ProducerRecord record = new ProducerRecord(topicName, payload); + ProducerRecord record = new ProducerRecord<>(topicName, payload); long sleepTime = NS_PER_SEC / throughput; long sleepDeficitNs = 0; Stats stats = new Stats(numRecords, 5000); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 60594a7..6ec5e0d 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -36,24 +36,24 @@ public final class Cluster { */ public Cluster(Collection nodes, Collection partitions) { // make a randomized, unmodifiable copy of the nodes - List copy = new ArrayList(nodes); + List copy = new ArrayList<>(nodes); Collections.shuffle(copy); this.nodes = Collections.unmodifiableList(copy); - this.nodesById = new HashMap(); + this.nodesById = new HashMap<>(); for (Node node: nodes) this.nodesById.put(node.id(), node); // index the partitions by topic/partition for quick lookup - this.partitionsByTopicPartition = new HashMap(partitions.size()); + this.partitionsByTopicPartition = new HashMap<>(partitions.size()); for (PartitionInfo p : partitions) this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); // index the partitions by topic and node respectively, and make the lists // unmodifiable so we can hand them out in user-facing apis without risk // of the client modifying the contents - HashMap> partsForTopic = new HashMap>(); - HashMap> partsForNode = new HashMap>(); + HashMap> partsForTopic = new HashMap<>(); + HashMap> partsForNode = new HashMap<>(); for (Node n : this.nodes) { partsForNode.put(n.id(), new ArrayList()); } @@ -68,20 +68,20 @@ public final class Cluster { psNode.add(p); } } - this.partitionsByTopic = new HashMap>(partsForTopic.size()); - this.availablePartitionsByTopic = new HashMap>(partsForTopic.size()); + this.partitionsByTopic = new HashMap<>(partsForTopic.size()); + this.availablePartitionsByTopic = new HashMap<>(partsForTopic.size()); for (Map.Entry> entry : partsForTopic.entrySet()) { String topic = entry.getKey(); List partitionList = entry.getValue(); this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList)); - List availablePartitions = new ArrayList(); + List availablePartitions = new ArrayList<>(); for (PartitionInfo part : partitionList) { if (part.leader() != null) availablePartitions.add(part); } this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions)); } - this.partitionsByNode = new HashMap>(partsForNode.size()); + this.partitionsByNode = new HashMap<>(partsForNode.size()); for (Map.Entry> entry : partsForNode.entrySet()) this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); @@ -100,7 +100,7 @@ public final class Cluster { * @return A cluster for these hosts/ports */ public static Cluster bootstrap(List addresses) { - List nodes = new ArrayList(); + List nodes = new ArrayList<>(); int nodeId = -1; for (InetSocketAddress address : addresses) nodes.add(new Node(nodeId--, address.getHostName(), address.getPort())); diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index 04b4a09..a1607a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -88,7 +88,7 @@ public final class MetricName { private static Map getTags(String... keyValue) { if ((keyValue.length % 2) != 0) throw new IllegalArgumentException("keyValue needs to be specified in paris"); - Map tags = new HashMap(); + Map tags = new HashMap<>(); for (int i = 0; i < keyValue.length / 2; i++) tags.put(keyValue[i], keyValue[i + 1]); @@ -182,4 +182,4 @@ public final class MetricName { return "MetricName [name=" + name + ", group=" + group + ", description=" + description + ", tags=" + tags + "]"; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index bae528d..6bbd4ec 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -91,13 +91,13 @@ public class AbstractConfig { } public Set unused() { - Set keys = new HashSet(originals.keySet()); + Set keys = new HashSet<>(originals.keySet()); keys.removeAll(used); return keys; } public Map originals() { - Map copy = new HashMap(); + Map copy = new HashMap<>(); copy.putAll(originals); return copy; } @@ -147,7 +147,7 @@ public class AbstractConfig { public List getConfiguredInstances(String key, Class t) { List klasses = getList(key); - List objects = new ArrayList(); + List objects = new ArrayList<>(); for (String klass : klasses) { Class c; try { diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 4170bcc..f36f70d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -49,7 +49,7 @@ public class ConfigDef { private static final Object NO_DEFAULT_VALUE = new String(""); - private final Map configKeys = new HashMap(); + private final Map configKeys = new HashMap<>(); /** * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef} @@ -164,7 +164,7 @@ public class ConfigDef { */ public Map parse(Map props) { /* parse all known keys */ - Map values = new HashMap(); + Map values = new HashMap<>(); for (ConfigKey key : configKeys.values()) { Object value; // props map contains setting - assign ConfigKey value @@ -395,7 +395,7 @@ public class ConfigDef { public String toHtmlTable() { // sort first required fields, then by importance, then name - List configs = new ArrayList(this.configKeys.values()); + List configs = new ArrayList<>(this.configKeys.values()); Collections.sort(configs, new Comparator() { public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) { // first take anything with no default value @@ -444,4 +444,4 @@ public class ConfigDef { b.append(""); return b.toString(); } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 6b9590c..35d22cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -44,7 +44,7 @@ public class JmxReporter implements MetricsReporter { private static final Logger log = LoggerFactory.getLogger(JmxReporter.class); private static final Object LOCK = new Object(); private String prefix; - private final Map mbeans = new HashMap(); + private final Map mbeans = new HashMap<>(); public JmxReporter() { this(""); @@ -143,7 +143,7 @@ public class JmxReporter implements MetricsReporter { private final Map metrics; public KafkaMbean(String mbeanName) throws MalformedObjectNameException { - this.metrics = new HashMap(); + this.metrics = new HashMap<>(); this.objectName = new ObjectName(mbeanName); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 5f6caf9..944810f 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -86,8 +86,8 @@ public class Metrics implements Closeable { */ public Metrics(MetricConfig defaultConfig, List reporters, Time time) { this.config = defaultConfig; - this.sensors = new CopyOnWriteMap(); - this.metrics = new CopyOnWriteMap(); + this.sensors = new CopyOnWriteMap<>(); + this.metrics = new CopyOnWriteMap<>(); this.reporters = Utils.notNull(reporters); this.time = time; for (MetricsReporter reporter : reporters) @@ -170,7 +170,7 @@ public class Metrics implements Closeable { * Add a MetricReporter */ public synchronized void addReporter(MetricsReporter reporter) { - Utils.notNull(reporter).init(new ArrayList(metrics.values())); + Utils.notNull(reporter).init(new ArrayList<>(metrics.values())); this.reporters.add(reporter); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ca823fd..a2874de 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -43,8 +43,8 @@ public final class Sensor { this.registry = registry; this.name = Utils.notNull(name); this.parents = parents == null ? new Sensor[0] : parents; - this.metrics = new ArrayList(); - this.stats = new ArrayList(); + this.metrics = new ArrayList<>(); + this.stats = new ArrayList<>(); this.config = config; this.time = time; checkForest(new HashSet()); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java index 78c93e8..e8c1da9 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java @@ -56,7 +56,7 @@ public class Percentiles extends SampledStat implements CompoundStat { @Override public List stats() { - List ms = new ArrayList(this.percentiles.length); + List ms = new ArrayList<>(this.percentiles.length); for (Percentile percentile : this.percentiles) { final double pct = percentile.percentile(); ms.add(new NamedMeasurable(percentile.name(), new Measurable() { diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index b341b7d..4f383fb 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -36,7 +36,7 @@ public abstract class SampledStat implements MeasurableStat { public SampledStat(double initialValue) { this.initialValue = initialValue; - this.samples = new ArrayList(2); + this.samples = new ArrayList<>(2); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index aaf60c9..bbd47d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -101,15 +101,15 @@ public class Selector implements Selectable { this.time = time; this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; - this.keys = new HashMap(); - this.completedSends = new ArrayList(); - this.completedReceives = new ArrayList(); - this.connected = new ArrayList(); - this.disconnected = new ArrayList(); - this.failedSends = new ArrayList(); + this.keys = new HashMap<>(); + this.completedSends = new ArrayList<>(); + this.completedReceives = new ArrayList<>(); + this.connected = new ArrayList<>(); + this.disconnected = new ArrayList<>(); + this.failedSends = new ArrayList<>(); this.sensors = new SelectorMetrics(metrics); // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true - this.lruConnections = new LinkedHashMap(16, .75F, true); + this.lruConnections = new LinkedHashMap<>(16, .75F, true); currentTimeNanos = new SystemTime().nanoseconds(); nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; this.metricsPerConnection = metricsPerConnection; @@ -193,7 +193,7 @@ public class Selector implements Selectable { */ @Override public void close() { - List connections = new LinkedList(keys.keySet()); + List connections = new LinkedList<>(keys.keySet()); for (String id: connections) close(id); @@ -588,7 +588,7 @@ public class Selector implements Selectable { if (nodeRequest == null) { String metricGrpName = metricGrpPrefix + "-node-metrics"; - Map tags = new LinkedHashMap(metricTags); + Map tags = new LinkedHashMap<>(metricTags); tags.put("node-id", "node-" + connectionId); nodeRequest = this.metrics.sensor(nodeRequestName); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 4c0ecc3..4c7660f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -83,8 +83,8 @@ public enum Errors { INVALID_COMMIT_OFFSET_SIZE(28, new ApiException("The committing offset data size is not valid")); - private static Map, Errors> classToError = new HashMap, Errors>(); - private static Map codeToError = new HashMap(); + private static Map, Errors> classToError = new HashMap<>(); + private static Map codeToError = new HashMap<>(); static { for (Errors error : Errors.values()) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index dab1a94..4f54597 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -27,8 +27,8 @@ public enum SecurityProtocol { /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ TRACE(Short.MAX_VALUE, "TRACE"); - private static final Map CODE_TO_SECURITY_PROTOCOL = new HashMap(); - private static final List NAMES = new ArrayList(); + private static final Map CODE_TO_SECURITY_PROTOCOL = new HashMap<>(); + private static final List NAMES = new ArrayList<>(); static { for (SecurityProtocol proto: SecurityProtocol.values()) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 3a14ac0..65d1b5a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -31,7 +31,7 @@ public class Schema extends Type { */ public Schema(Field... fs) { this.fields = new Field[fs.length]; - this.fieldsByName = new HashMap(); + this.fieldsByName = new HashMap<>(); for (int i = 0; i < this.fields.length; i++) { Field field = fs[i]; if (fieldsByName.containsKey(field.name)) @@ -156,4 +156,4 @@ public class Schema extends Type { } } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index df073a0..9dc80ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -76,11 +76,11 @@ public class FetchRequest extends AbstractRequest { struct.set(REPLICA_ID_KEY_NAME, replicaId); struct.set(MAX_WAIT_KEY_NAME, maxWait); struct.set(MIN_BYTES_KEY_NAME, minBytes); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> topicEntry : topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -104,7 +104,7 @@ public class FetchRequest extends AbstractRequest { replicaId = struct.getInt(REPLICA_ID_KEY_NAME); maxWait = struct.getInt(MAX_WAIT_KEY_NAME); minBytes = struct.getInt(MIN_BYTES_KEY_NAME); - fetchData = new HashMap(); + fetchData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); @@ -121,7 +121,7 @@ public class FetchRequest extends AbstractRequest { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); for (Map.Entry entry: fetchData.entrySet()) { FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index eb8951f..fc27110 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -76,11 +76,11 @@ public class FetchResponse extends AbstractRequestResponse { super(new Struct(CURRENT_SCHEMA)); Map> topicsData = CollectionUtils.groupDataByTopic(responseData); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -99,7 +99,7 @@ public class FetchResponse extends AbstractRequestResponse { public FetchResponse(Struct struct) { super(struct); - responseData = new HashMap(); + responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 1ffe076..ac9d90a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -60,7 +60,7 @@ public class JoinGroupRequest extends AbstractRequest { groupId = struct.getString(GROUP_ID_KEY_NAME); sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME); Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME); - topics = new ArrayList(); + topics = new ArrayList<>(); for (Object topic: topicsArray) topics.add((String) topic); consumerId = struct.getString(CONSUMER_ID_KEY_NAME); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 7bf544e..71b8197 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -60,7 +60,7 @@ public class JoinGroupResponse extends AbstractRequestResponse { struct.set(ERROR_CODE_KEY_NAME, errorCode); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(CONSUMER_ID_KEY_NAME, consumerId); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> entries: partitionsByTopic.entrySet()) { Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); @@ -77,7 +77,7 @@ public class JoinGroupResponse extends AbstractRequestResponse { public JoinGroupResponse(Struct struct) { super(struct); - assignedPartitions = new ArrayList(); + assignedPartitions = new ArrayList<>(); for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) { Struct topicData = (Struct) topicDataObj; String topic = topicData.getString(TOPIC_KEY_NAME); @@ -108,4 +108,4 @@ public class JoinGroupResponse extends AbstractRequestResponse { public static JoinGroupResponse parse(ByteBuffer buffer) { return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer)); } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 6da4a0e..9acd9f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -67,11 +67,11 @@ public class ListOffsetRequest extends AbstractRequest { Map> topicsData = CollectionUtils.groupDataByTopic(offsetData); struct.set(REPLICA_ID_KEY_NAME, replicaId); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { PartitionData offsetPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -91,7 +91,7 @@ public class ListOffsetRequest extends AbstractRequest { public ListOffsetRequest(Struct struct) { super(struct); replicaId = struct.getInt(REPLICA_ID_KEY_NAME); - offsetData = new HashMap(); + offsetData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); @@ -108,7 +108,7 @@ public class ListOffsetRequest extends AbstractRequest { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); for (Map.Entry entry: offsetData.entrySet()) { ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index f706086..2d36eb5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -68,11 +68,11 @@ public class ListOffsetResponse extends AbstractRequestResponse { super(new Struct(CURRENT_SCHEMA)); Map> topicsData = CollectionUtils.groupDataByTopic(responseData); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { PartitionData offsetPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -90,7 +90,7 @@ public class ListOffsetResponse extends AbstractRequestResponse { public ListOffsetResponse(Struct struct) { super(struct); - responseData = new HashMap(); + responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); @@ -99,7 +99,7 @@ public class ListOffsetResponse extends AbstractRequestResponse { int partition = partitionResponse.getInt(PARTITION_KEY_NAME); short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME); - List offsetsList = new ArrayList(); + List offsetsList = new ArrayList<>(); for (Object offset: offsets) offsetsList.add((Long) offset); PartitionData partitionData = new PartitionData(errorCode, offsetsList); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index f70e8da..9b9d11d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -43,7 +43,7 @@ public class MetadataRequest extends AbstractRequest { public MetadataRequest(Struct struct) { super(struct); Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); - topics = new ArrayList(); + topics = new ArrayList<>(); for (Object topicObj: topicArray) { topics.add((String) topicObj); } @@ -51,7 +51,7 @@ public class MetadataRequest extends AbstractRequest { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - Map topicErrors = new HashMap(); + Map topicErrors = new HashMap<>(); for (String topic : topics) { topicErrors.put(topic, Errors.forException(e)); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index c8f2d08..05b7d27 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -74,7 +74,7 @@ public class MetadataResponse extends AbstractRequestResponse { public MetadataResponse(Cluster cluster, Map errors) { super(new Struct(CURRENT_SCHEMA)); - List brokerArray = new ArrayList(); + List brokerArray = new ArrayList<>(); for (Node node : cluster.nodes()) { Struct broker = struct.instance(BROKERS_KEY_NAME); broker.set(NODE_ID_KEY_NAME, node.id()); @@ -84,7 +84,7 @@ public class MetadataResponse extends AbstractRequestResponse { } struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (String topic : cluster.topics()) { Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); @@ -93,17 +93,17 @@ public class MetadataResponse extends AbstractRequestResponse { topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code()); } else { topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) { Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code()); partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition()); partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id()); - ArrayList replicas = new ArrayList(); + ArrayList replicas = new ArrayList<>(); for (Node node : fetchPartitionData.replicas()) replicas.add(node.id()); partitionData.set(REPLICAS_KEY_NAME, replicas.toArray()); - ArrayList isr = new ArrayList(); + ArrayList isr = new ArrayList<>(); for (Node node : fetchPartitionData.inSyncReplicas()) isr.add(node.id()); partitionData.set(ISR_KEY_NAME, isr.toArray()); @@ -117,13 +117,13 @@ public class MetadataResponse extends AbstractRequestResponse { struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray()); this.cluster = cluster; - this.errors = new HashMap(); + this.errors = new HashMap<>(); } public MetadataResponse(Struct struct) { super(struct); - Map errors = new HashMap(); - Map brokers = new HashMap(); + Map errors = new HashMap<>(); + Map brokers = new HashMap<>(); Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME); for (int i = 0; i < brokerStructs.length; i++) { Struct broker = (Struct) brokerStructs[i]; @@ -132,7 +132,7 @@ public class MetadataResponse extends AbstractRequestResponse { int port = broker.getInt(PORT_KEY_NAME); brokers.put(nodeId, new Node(nodeId, host, port)); } - List partitions = new ArrayList(); + List partitions = new ArrayList<>(); Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME); for (int i = 0; i < topicInfos.length; i++) { Struct topicInfo = (Struct) topicInfos[i]; @@ -174,4 +174,4 @@ public class MetadataResponse extends AbstractRequestResponse { public static MetadataResponse parse(ByteBuffer buffer) { return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index d6e6386..5b772b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -149,12 +149,12 @@ public class OffsetCommitRequest extends AbstractRequest { Map> topicsData = CollectionUtils.groupDataByTopic(offsetData); struct.set(GROUP_ID_KEY_NAME, groupId); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -194,7 +194,7 @@ public class OffsetCommitRequest extends AbstractRequest { else retentionTime = DEFAULT_RETENTION_TIME; - offsetData = new HashMap(); + offsetData = new HashMap<>(); for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicData = (Struct) topicDataObj; String topic = topicData.getString(TOPIC_KEY_NAME); @@ -218,7 +218,7 @@ public class OffsetCommitRequest extends AbstractRequest { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); for (Map.Entry entry: offsetData.entrySet()) { responseData.put(entry.getKey(), Errors.forException(e).code()); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index a163333..3542139 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -57,11 +57,11 @@ public class OffsetCommitResponse extends AbstractRequestResponse { Map> topicsData = CollectionUtils.groupDataByTopic(responseData); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> entries: topicsData.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : entries.getValue().entrySet()) { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); @@ -77,7 +77,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse { public OffsetCommitResponse(Struct struct) { super(struct); - responseData = new HashMap(); + responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 6ee7597..26bb4d0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -51,11 +51,11 @@ public class OffsetFetchRequest extends AbstractRequest { Map> topicsData = CollectionUtils.groupDataByTopic(partitions); struct.set(GROUP_ID_KEY_NAME, groupId); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> entries: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Integer partiitonId : entries.getValue()) { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partiitonId); @@ -71,7 +71,7 @@ public class OffsetFetchRequest extends AbstractRequest { public OffsetFetchRequest(Struct struct) { super(struct); - partitions = new ArrayList(); + partitions = new ArrayList<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); @@ -86,7 +86,7 @@ public class OffsetFetchRequest extends AbstractRequest { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); for (TopicPartition partition: partitions) { responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 3dc8521..2ca7ea3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -77,11 +77,11 @@ public class OffsetFetchResponse extends AbstractRequestResponse { Map> topicsData = CollectionUtils.groupDataByTopic(responseData); - List topicArray = new ArrayList(); + List topicArray = new ArrayList<>(); for (Map.Entry> entries : topicsData.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : entries.getValue().entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -100,7 +100,7 @@ public class OffsetFetchResponse extends AbstractRequestResponse { public OffsetFetchResponse(Struct struct) { super(struct); - responseData = new HashMap(); + responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 715504b..62ae112 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -51,11 +51,11 @@ public class ProduceRequest extends AbstractRequest { Map> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); struct.set(ACKS_KEY_NAME, acks); struct.set(TIMEOUT_KEY_NAME, timeout); - List topicDatas = new ArrayList(recordsByTopic.size()); + List topicDatas = new ArrayList<>(recordsByTopic.size()); for (Map.Entry> entry : recordsByTopic.entrySet()) { Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : entry.getValue().entrySet()) { ByteBuffer buffer = partitionEntry.getValue().duplicate(); Struct part = topicData.instance(PARTITION_DATA_KEY_NAME) @@ -74,7 +74,7 @@ public class ProduceRequest extends AbstractRequest { public ProduceRequest(Struct struct) { super(struct); - partitionRecords = new HashMap(); + partitionRecords = new HashMap<>(); for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { Struct topicData = (Struct) topicDataObj; String topic = topicData.getString(TOPIC_KEY_NAME); @@ -95,7 +95,7 @@ public class ProduceRequest extends AbstractRequest { if (acks == 0) return null; - Map responseMap = new HashMap(); + Map responseMap = new HashMap<>(); for (Map.Entry entry : partitionRecords.entrySet()) { responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 37ec0b7..f690970 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -53,11 +53,11 @@ public class ProduceResponse extends AbstractRequestResponse { public ProduceResponse(Map responses) { super(new Struct(CURRENT_SCHEMA)); Map> responseByTopic = CollectionUtils.groupDataByTopic(responses); - List topicDatas = new ArrayList(responseByTopic.size()); + List topicDatas = new ArrayList<>(responseByTopic.size()); for (Map.Entry> entry : responseByTopic.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entry.getKey()); - List partitionArray = new ArrayList(); + List partitionArray = new ArrayList<>(); for (Map.Entry partitionEntry : entry.getValue().entrySet()) { PartitionResponse part = partitionEntry.getValue(); Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) @@ -75,7 +75,7 @@ public class ProduceResponse extends AbstractRequestResponse { public ProduceResponse(Struct struct) { super(struct); - responses = new HashMap(); + responses = new HashMap<>(); for (Object topicResponse : struct.getArray("responses")) { Struct topicRespStruct = (Struct) topicResponse; String topic = topicRespStruct.getString("topic"); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java index ba38637..cd96604 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java @@ -27,13 +27,13 @@ public class CollectionUtils { * @return partitioned data */ public static Map> groupDataByTopic(Map data) { - Map> dataByTopic = new HashMap>(); + Map> dataByTopic = new HashMap<>(); for (Map.Entry entry: data.entrySet()) { String topic = entry.getKey().topic(); int partition = entry.getKey().partition(); Map topicData = dataByTopic.get(topic); if (topicData == null) { - topicData = new HashMap(); + topicData = new HashMap<>(); dataByTopic.put(topic, topicData); } topicData.put(partition, entry.getValue()); @@ -47,12 +47,12 @@ public class CollectionUtils { * @return partitions per topic */ public static Map> groupDataByTopic(List partitions) { - Map> partitionsByTopic = new HashMap>(); + Map> partitionsByTopic = new HashMap<>(); for (TopicPartition tp: partitions) { String topic = tp.topic(); List topicData = partitionsByTopic.get(topic); if (topicData == null) { - topicData = new ArrayList(); + topicData = new ArrayList<>(); partitionsByTopic.put(topic, topicData); } topicData.add(tp.partition()); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java index 9c0e81a..7448493 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java @@ -81,7 +81,7 @@ public class CopyOnWriteMap implements ConcurrentMap { @Override public synchronized V put(K k, V v) { - Map copy = new HashMap(this.map); + Map copy = new HashMap<>(this.map); V prev = copy.put(k, v); this.map = Collections.unmodifiableMap(copy); return prev; @@ -89,14 +89,14 @@ public class CopyOnWriteMap implements ConcurrentMap { @Override public synchronized void putAll(Map entries) { - Map copy = new HashMap(this.map); + Map copy = new HashMap<>(this.map); copy.putAll(entries); this.map = Collections.unmodifiableMap(copy); } @Override public synchronized V remove(Object key) { - Map copy = new HashMap(this.map); + Map copy = new HashMap<>(this.map); V prev = copy.remove(key); this.map = Collections.unmodifiableMap(copy); return prev; diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index d9c97e9..68f448d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -48,10 +48,10 @@ public class MockClient implements KafkaClient { private final Time time; private int correlation = 0; private Node node = null; - private final Set ready = new HashSet(); - private final Queue requests = new ArrayDeque(); - private final Queue responses = new ArrayDeque(); - private final Queue futureResponses = new ArrayDeque(); + private final Set ready = new HashSet<>(); + private final Queue requests = new ArrayDeque<>(); + private final Queue responses = new ArrayDeque<>(); + private final Queue futureResponses = new ArrayDeque<>(); public MockClient(Time time) { this.time = time; @@ -103,7 +103,7 @@ public class MockClient implements KafkaClient { @Override public List poll(long timeoutMs, long now) { - List copy = new ArrayList(this.responses); + List copy = new ArrayList<>(this.responses); while (!this.responses.isEmpty()) { ClientResponse response = this.responses.poll(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 738f3ed..8ed5037 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -36,7 +36,7 @@ public class KafkaConsumerTest { final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { - KafkaConsumer consumer = new KafkaConsumer( + KafkaConsumer consumer = new KafkaConsumer<>( props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } catch (KafkaException e) { Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 26b6b40..334c0d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -25,14 +25,14 @@ import org.junit.Test; public class MockConsumerTest { - private MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST); + private MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @Test public void testSimpleMock() { consumer.subscribe("topic"); assertEquals(0, consumer.poll(1000).count()); - ConsumerRecord rec1 = new ConsumerRecord("test", 0, 0, "key1", "value1"); - ConsumerRecord rec2 = new ConsumerRecord("test", 0, 1, "key2", "value2"); + ConsumerRecord rec1 = new ConsumerRecord<>("test", 0, 0, "key1", "value1"); + ConsumerRecord rec2 = new ConsumerRecord<>("test", 0, 1, "key2", "value2"); consumer.addRecord(rec1); consumer.addRecord(rec2); ConsumerRecords recs = consumer.poll(1); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index d085fe5..9d09b0e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -58,7 +58,7 @@ public class CoordinatorTest { private Node node = cluster.nodes().get(0); private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private Metrics metrics = new Metrics(time); - private Map metricTags = new LinkedHashMap(); + private Map metricTags = new LinkedHashMap<>(); private Coordinator coordinator = new Coordinator(client, groupId, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 405efdc..4a773a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -59,11 +59,11 @@ public class FetcherTest { private Node node = cluster.nodes().get(0); private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); private Metrics metrics = new Metrics(time); - private Map metricTags = new LinkedHashMap(); + private Map metricTags = new LinkedHashMap<>(); private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); - private Fetcher fetcher = new Fetcher(client, + private Fetcher fetcher = new Fetcher<>(client, minBytes, maxWaitMs, fetchSize, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index f3f8334..f336599 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -38,7 +38,7 @@ public class KafkaProducerTest { final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { - KafkaProducer producer = new KafkaProducer( + KafkaProducer producer = new KafkaProducer<>( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); @@ -59,7 +59,7 @@ public class KafkaProducerTest { final int oldInitCount = MockSerializer.INIT_COUNT.get(); final int oldCloseCount = MockSerializer.CLOSE_COUNT.get(); - KafkaProducer producer = new KafkaProducer( + KafkaProducer producer = new KafkaProducer<>( props, new MockSerializer(), new MockSerializer()); Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 7a46c56..5b9a39d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -42,8 +42,8 @@ public class MockProducerTest { @Test @SuppressWarnings("unchecked") public void testAutoCompleteMock() throws Exception { - MockProducer producer = new MockProducer(true, new MockSerializer(), new MockSerializer()); - ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes()); + MockProducer producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer()); + ProducerRecord record = new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes()); Future metadata = producer.send(record); assertTrue("Send should be immediately complete", metadata.isDone()); assertFalse("Send should be successful", isError(metadata)); @@ -59,8 +59,8 @@ public class MockProducerTest { PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); Cluster cluster = new Cluster(new ArrayList(0), asList(partitionInfo0, partitionInfo1)); - MockProducer producer = new MockProducer(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); - ProducerRecord record = new ProducerRecord(topic, "key", "value"); + MockProducer producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); + ProducerRecord record = new ProducerRecord<>(topic, "key", "value"); Future metadata = producer.send(record); assertEquals("Partition should be correct", 1, metadata.get().partition()); producer.clear(); @@ -69,9 +69,9 @@ public class MockProducerTest { @Test public void testManualCompletion() throws Exception { - MockProducer producer = new MockProducer(false, new MockSerializer(), new MockSerializer()); - ProducerRecord record1 = new ProducerRecord(topic, "key1".getBytes(), "value1".getBytes()); - ProducerRecord record2 = new ProducerRecord(topic, "key2".getBytes(), "value2".getBytes()); + MockProducer producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + ProducerRecord record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes()); + ProducerRecord record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes()); Future md1 = producer.send(record1); assertFalse("Send shouldn't have completed", md1.isDone()); Future md2 = producer.send(record2); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java index 7bb181e..ec8b002 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java @@ -25,27 +25,27 @@ public class ProducerRecordTest { @Test public void testEqualsAndHashCode() { - ProducerRecord producerRecord = new ProducerRecord("test", 1 , "key", 1); + ProducerRecord producerRecord = new ProducerRecord<>("test", 1, "key", 1); assertEquals(producerRecord, producerRecord); assertEquals(producerRecord.hashCode(), producerRecord.hashCode()); - ProducerRecord equalRecord = new ProducerRecord("test", 1 , "key", 1); + ProducerRecord equalRecord = new ProducerRecord<>("test", 1, "key", 1); assertEquals(producerRecord, equalRecord); assertEquals(producerRecord.hashCode(), equalRecord.hashCode()); - ProducerRecord topicMisMatch = new ProducerRecord("test-1", 1 , "key", 1); + ProducerRecord topicMisMatch = new ProducerRecord<>("test-1", 1, "key", 1); assertFalse(producerRecord.equals(topicMisMatch)); - ProducerRecord partitionMismatch = new ProducerRecord("test", 2 , "key", 1); + ProducerRecord partitionMismatch = new ProducerRecord<>("test", 2, "key", 1); assertFalse(producerRecord.equals(partitionMismatch)); - ProducerRecord keyMisMatch = new ProducerRecord("test", 1 , "key-1", 1); + ProducerRecord keyMisMatch = new ProducerRecord<>("test", 1, "key-1", 1); assertFalse(producerRecord.equals(keyMisMatch)); - ProducerRecord valueMisMatch = new ProducerRecord("test", 1 , "key", 2); + ProducerRecord valueMisMatch = new ProducerRecord<>("test", 1, "key", 2); assertFalse(producerRecord.equals(valueMisMatch)); - ProducerRecord nullFieldsRecord = new ProducerRecord("topic", null, null, null); + ProducerRecord nullFieldsRecord = new ProducerRecord<>("topic", null, null, null); assertEquals(nullFieldsRecord, nullFieldsRecord); assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 2c69382..a8828cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -36,7 +36,7 @@ public class BufferPoolTest { private MockTime time = new MockTime(); private Metrics metrics = new Metrics(time); String metricGroup = "TestMetrics"; - Map metricTags = new LinkedHashMap(); + Map metricTags = new LinkedHashMap<>(); /** * Test the simple non-blocking allocation paths @@ -148,7 +148,7 @@ public class BufferPoolTest { final int poolableSize = 1024; final long totalMemory = numThreads / 2 * poolableSize; final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); - List threads = new ArrayList(); + List threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) threads.add(new StressTestThread(pool, iterations)); for (StressTestThread thread : threads) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 5b2e4ff..781890e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -62,7 +62,7 @@ public class RecordAccumulatorTest { private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); - Map metricTags = new LinkedHashMap(); + Map metricTags = new LinkedHashMap<>(); @Test public void testFull() throws Exception { @@ -137,7 +137,7 @@ public class RecordAccumulatorTest { final int msgs = 10000; final int numParts = 2; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); - List threads = new ArrayList(); + List threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..6662a94 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -51,7 +51,7 @@ public class SenderTest { private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); - Map metricTags = new LinkedHashMap(); + Map metricTags = new LinkedHashMap<>(); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 44c2ef0..d914247 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -95,7 +95,7 @@ public class ConfigDefTest { private void testBadInputs(Type type, Object... values) { for (Object value : values) { - Map m = new HashMap(); + Map m = new HashMap<>(); m.put("name", value); ConfigDef def = new ConfigDef().define("name", type, Importance.HIGH, "docs"); try { @@ -128,13 +128,13 @@ public class ConfigDefTest { ConfigDef def = new ConfigDef().define("name", type, defaultVal, validator, Importance.HIGH, "docs"); for (Object value : okValues) { - Map m = new HashMap(); + Map m = new HashMap<>(); m.put("name", value); def.parse(m); } for (Object value : badValues) { - Map m = new HashMap(); + Map m = new HashMap<>(); m.put("name", value); try { def.parse(m); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 544e120..4d8668a 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -44,7 +44,7 @@ public class MetricsTest { @Test public void testMetricName() { MetricName n1 = new MetricName("name", "group", "description", "key1", "value1"); - Map tags = new HashMap(); + Map tags = new HashMap<>(); tags.put("key1", "value1"); MetricName n2 = new MetricName("name", "group", "description", tags); assertEquals("metric names created in two different ways should be equal", n1, n2); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 158f982..0354dc6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -153,8 +153,8 @@ public class SelectorTest { for (int i = 0; i < conns; i++) selector.connect(Integer.toString(i), addr, BUFFER_SIZE, BUFFER_SIZE); // send echo requests and receive responses - Map requests = new HashMap(); - Map responses = new HashMap(); + Map requests = new HashMap<>(); + Map responses = new HashMap<>(); int responseCount = 0; for (int i = 0; i < conns; i++) { String node = Integer.toString(i); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 8ec610a..c84e46c 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -71,7 +71,7 @@ public class MemoryRecordsTest { @Parameterized.Parameters public static Collection data() { - List values = new ArrayList(); + List values = new ArrayList<>(); for (CompressionType type: CompressionType.values()) values.add(new Object[] {type}); return values; diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index 957fc8f..1f520fc 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -98,7 +98,7 @@ public class RecordTest { public static Collection data() { byte[] payload = new byte[1000]; Arrays.fill(payload, (byte) 1); - List values = new ArrayList(); + List values = new ArrayList<>(); for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) for (CompressionType compression : CompressionType.values()) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 8b2aca8..5db8d29 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -94,14 +94,14 @@ public class RequestResponseTest { } private AbstractRequest createFetchRequest() { - Map fetchData = new HashMap(); + Map fetchData = new HashMap<>(); fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000)); fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000)); return new FetchRequest(-1, 100, 100000, fetchData); } private AbstractRequestResponse createFetchResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); return new FetchResponse(responseData); } @@ -123,13 +123,13 @@ public class RequestResponseTest { } private AbstractRequest createListOffsetRequest() { - Map offsetData = new HashMap(); + Map offsetData = new HashMap<>(); offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); return new ListOffsetRequest(-1, offsetData); } private AbstractRequestResponse createListOffsetResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); return new ListOffsetResponse(responseData); } @@ -146,19 +146,19 @@ public class RequestResponseTest { isr[0] = node; Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr))); - Map errors = new HashMap(); + Map errors = new HashMap<>(); errors.put("topic2", Errors.LEADER_NOT_AVAILABLE); return new MetadataResponse(cluster, errors); } private AbstractRequest createOffsetCommitRequest() { - Map commitData = new HashMap(); + Map commitData = new HashMap<>(); commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData); } private AbstractRequestResponse createOffsetCommitResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), Errors.NONE.code()); return new OffsetCommitResponse(responseData); } @@ -168,19 +168,19 @@ public class RequestResponseTest { } private AbstractRequestResponse createOffsetFetchResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code())); return new OffsetFetchResponse(responseData); } private AbstractRequest createProduceRequest() { - Map produceData = new HashMap(); + Map produceData = new HashMap<>(); produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10)); return new ProduceRequest((short) 1, 5000, produceData); } private AbstractRequestResponse createProduceResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000)); return new ProduceResponse(responseData); } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 383bf48..21323b5 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -37,7 +37,7 @@ public class SerializationTest { public void testStringSerializer() { String str = "my string"; String mytopic = "testTopic"; - List encodings = new ArrayList(); + List encodings = new ArrayList<>(); encodings.add("UTF8"); encodings.add("UTF-16"); @@ -75,16 +75,16 @@ public class SerializationTest { } private SerDeser getStringSerDeser(String encoder) { - Map serializerConfigs = new HashMap(); + Map serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", encoder); Serializer serializer = new StringSerializer(); serializer.configure(serializerConfigs, true); - Map deserializerConfigs = new HashMap(); + Map deserializerConfigs = new HashMap<>(); deserializerConfigs.put("key.deserializer.encoding", encoder); Deserializer deserializer = new StringDeserializer(); deserializer.configure(deserializerConfigs, true); - return new SerDeser(serializer, deserializer); + return new SerDeser<>(serializer, deserializer); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java index c788e66..622e408 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java @@ -29,10 +29,10 @@ public class AbstractIteratorTest { @Test public void testIterator() { int max = 10; - List l = new ArrayList(); + List l = new ArrayList<>(); for (int i = 0; i < max; i++) l.add(i); - ListIterator iter = new ListIterator(l); + ListIterator iter = new ListIterator<>(l); for (int i = 0; i < max; i++) { Integer value = i; assertEquals(value, iter.peek()); @@ -44,7 +44,7 @@ public class AbstractIteratorTest { @Test(expected = NoSuchElementException.class) public void testEmptyIterator() { - Iterator iter = new ListIterator(Collections.emptyList()); + Iterator iter = new ListIterator<>(Collections.emptyList()); iter.next(); } diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java index 8cd19b2..fc42f11 100644 --- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java +++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java @@ -145,20 +145,20 @@ public class Microbenchmarks { t3.join(); t4.join(); - Map values = new HashMap(); + Map values = new HashMap<>(); for (int i = 0; i < 100; i++) values.put(Integer.toString(i), i); System.out.println("HashMap:"); benchMap(2, 1000000, values); System.out.println("ConcurentHashMap:"); - benchMap(2, 1000000, new ConcurrentHashMap(values)); + benchMap(2, 1000000, new ConcurrentHashMap<>(values)); System.out.println("CopyOnWriteMap:"); - benchMap(2, 1000000, new CopyOnWriteMap(values)); + benchMap(2, 1000000, new CopyOnWriteMap<>(values)); } private static void benchMap(int numThreads, final int iters, final Map map) throws Exception { - final List keys = new ArrayList(map.keySet()); - final List threads = new ArrayList(); + final List keys = new ArrayList<>(map.keySet()); + final List threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index 51eb9d1..3a3448c 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -29,11 +29,11 @@ import org.apache.kafka.common.utils.Time; public class MockSelector implements Selectable { private final Time time; - private final List initiatedSends = new ArrayList(); - private final List completedSends = new ArrayList(); - private final List completedReceives = new ArrayList(); - private final List disconnected = new ArrayList(); - private final List connected = new ArrayList(); + private final List initiatedSends = new ArrayList<>(); + private final List completedSends = new ArrayList<>(); + private final List completedReceives = new ArrayList<>(); + private final List disconnected = new ArrayList<>(); + private final List connected = new ArrayList<>(); public MockSelector(Time time) { this.time = time; diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index ccf3a5f..d82e313 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -51,7 +51,7 @@ public class TestUtils { Node[] ns = new Node[nodes]; for (int i = 0; i < nodes; i++) ns[i] = new Node(0, "localhost", 1969); - List parts = new ArrayList(); + List parts = new ArrayList<>(); for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); return new Cluster(asList(ns), parts); diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index c9b9018..07ce176 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -221,8 +221,7 @@ public class KafkaETLContext { long[] range = new long[2]; TopicAndPartition topicAndPartition = new TopicAndPartition(_request.getTopic(), _request.getPartition()); - Map requestInfo = - new HashMap(); + Map requestInfo = new HashMap<>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); OffsetRequest request = new OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId()); diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java index f040fbe..dfd384b 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java @@ -54,7 +54,7 @@ extends SequenceFileRecordReader { super(job, (FileSplit) split); _props = KafkaETLUtils.getPropsFromJob(job); - _contextList = new ArrayList(); + _contextList = new ArrayList<>(); _job = job; _reporter = reporter; _contextIndex = -1; diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java index 02d79a1..49e93b5 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java @@ -85,7 +85,7 @@ public class KafkaETLUtils { } BufferedReader in = new BufferedReader(new InputStreamReader( fs.open(path))); - List buf = new ArrayList(); + List buf = new ArrayList<>(); String line = null; while ((line = in.readLine()) != null) { diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java index 71eb80f..079c394 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java @@ -126,7 +126,7 @@ public class Props extends Properties { if (args.length % 2 != 0) throw new KafkaException( "Must have an equal number of keys and values."); - Map vals = new HashMap(args.length / 2); + Map vals = new HashMap<>(args.length / 2); for (int i = 0; i < args.length; i += 2) vals.put(args[i], args[i + 1]); return new Props(vals); @@ -405,7 +405,7 @@ public class Props extends Properties { * The string prefix */ public Map getMapByPrefix(String prefix) { - Map values = new HashMap(); + Map values = new HashMap<>(); for (String key : super.stringPropertyNames()) { if (key.startsWith(prefix)) { diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index d27a511..483fb0d 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -82,7 +82,7 @@ public class DataGenerator { public void run() throws Exception { - List list = new ArrayList(); + List list = new ArrayList<>(); for (int i = 0; i < _count; i++) { Long timestamp = RANDOM.nextLong(); if (timestamp < 0) timestamp = -timestamp; diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 417b4b3..a484649 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -49,7 +49,7 @@ public class KafkaOutputFormat extends OutputFormat public static final String KAFKA_CONFIG_PREFIX = "kafka.output"; private static final Map kafkaConfigMap; static { - Map cMap = new HashMap(); + Map cMap = new HashMap<>(); // default Hadoop producer configs cMap.put("producer.type", "sync"); @@ -138,7 +138,7 @@ public class KafkaOutputFormat extends OutputFormat } else throw new KafkaException("missing scheme from kafka uri (must be kafka://)"); - Producer producer = new Producer(new ProducerConfig(props)); - return new KafkaRecordWriter(producer, topic, queueBytes); + Producer producer = new Producer<>(new ProducerConfig(props)); + return new KafkaRecordWriter<>(producer, topic, queueBytes); } } diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java index 72c088d..9fa774a 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java @@ -31,7 +31,7 @@ public class KafkaRecordWriter extends RecordWriter protected Producer producer; protected String topic; - protected List> msgList = new LinkedList>(); + protected List> msgList = new LinkedList<>(); protected int totalBytes = 0; protected int queueBytes; diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index f19df0c..2be619d 100755 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -166,8 +166,8 @@ public class KafkaMigrationTool int numConsumers = options.valueOf(numStreamsOpt); String producerConfigFile_08 = options.valueOf(producerConfigOpt); int numProducers = options.valueOf(numProducersOpt); - final List migrationThreads = new ArrayList(numConsumers); - final List producerThreads = new ArrayList(numProducers); + final List migrationThreads = new ArrayList<>(numConsumers); + final List producerThreads = new ArrayList<>(numProducers); try { File kafkaJar_07 = new File(kafkaJarFile_07); @@ -221,7 +221,7 @@ public class KafkaMigrationTool kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); // create a producer channel instead int queueSize = options.valueOf(queueSizeOpt); - ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize); + ProducerDataChannel> producerDataChannel = new ProducerDataChannel<>(queueSize); int threadId = 0; Runtime.getRuntime().addShutdownHook(new Thread() { @@ -286,7 +286,7 @@ public class KafkaMigrationTool public ProducerDataChannel(int queueSize) { producerQueueSize = queueSize; - producerRequestQueue = new ArrayBlockingQueue(producerQueueSize); + producerRequestQueue = new ArrayBlockingQueue<>(producerQueueSize); } public void sendRequest(T data) throws InterruptedException { diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 8af64d8..7eabaa7 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -54,7 +54,7 @@ public class Consumer extends Thread } public void run() { - Map topicCountMap = new HashMap(); + Map topicCountMap = new HashMap<>(); topicCountMap.put(topic, 1); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream stream = consumerMap.get(topic).get(0); diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index ccc9925..458c76f 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -38,7 +38,7 @@ public class Producer extends Thread props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producer = new KafkaProducer(props); + producer = new KafkaProducer<>(props); this.topic = topic; this.isAsync = isAsync; } @@ -50,12 +50,12 @@ public class Producer extends Thread String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously - producer.send(new ProducerRecord(topic, + producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { - producer.send(new ProducerRecord(topic, + producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index c43b461..89faa81 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -71,7 +71,7 @@ public class SimpleConsumerDemo { printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0)); System.out.println("Testing single multi-fetch"); - Map> topicMap = new HashMap>(); + Map> topicMap = new HashMap<>(); topicMap.put(KafkaProperties.topic2, Collections.singletonList(0)); topicMap.put(KafkaProperties.topic3, Collections.singletonList(0)); req = new FetchRequestBuilder() diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 628ff53..23de3df 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -129,7 +129,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton { } protected Producer getKafkaProducer(Properties props) { - return new KafkaProducer(props); + return new KafkaProducer<>(props); } @Override diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java index c35f26a..5782adc 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java @@ -26,7 +26,7 @@ import java.util.Properties; public class MockKafkaLog4jAppender extends KafkaLog4jAppender { private MockProducer mockProducer = - new MockProducer(false, new MockSerializer(), new MockSerializer()); + new MockProducer<>(false, new MockSerializer(), new MockSerializer()); @Override protected Producer getKafkaProducer(Properties props) { -- 2.4.5 From e3b7ea3c8bc5075a0d3b2ac99c19f2df0fda7f28 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 9 Jul 2015 09:49:01 +0100 Subject: [PATCH 2/3] kafka-2322; Use multi-catch to reduce redundancy --- examples/src/main/java/kafka/examples/Producer.java | 4 +--- .../main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 458c76f..cbd51fc 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -59,9 +59,7 @@ public class Producer extends Thread messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 23de3df..6f909a3 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -140,9 +140,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton { if (syncSend) { try { response.get(); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } catch (ExecutionException ex) { + } catch (InterruptedException | ExecutionException ex) { throw new RuntimeException(ex); } } -- 2.4.5 From 2aa4e8729e95c5150f465d3d2c95bfcff0a7e0f2 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 9 Jul 2015 09:50:56 +0100 Subject: [PATCH 3/3] kafka-2322; Use try with resources instead of try/finally It's more concise and handles the exception from `close` better. --- clients/src/main/java/org/apache/kafka/common/utils/Utils.java | 5 +---- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index af9993c..f5953ac 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -479,14 +479,11 @@ public class Utils { */ public static String readFileAsString(String path, Charset charset) throws IOException { if (charset == null) charset = Charset.defaultCharset(); - FileInputStream stream = new FileInputStream(new File(path)); String result = new String(); - try { + try (FileInputStream stream = new FileInputStream(new File(path))) { FileChannel fc = stream.getChannel(); MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); result = charset.decode(bb).toString(); - } finally { - stream.close(); } return result; } diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java index 49e93b5..c0ca50d 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java @@ -142,14 +142,11 @@ public class KafkaETLUtils { Path path = new Path(file); FileSystem fs = path.getFileSystem(new Configuration()); if (fs.exists(path)) { - InputStream input = fs.open(path); - try { + try (InputStream input = fs.open(path)) { // wrap it up in another layer so that the user can override // properties Props p = new Props(input); return new Props(p); - } finally { - input.close(); } } else { return new Props(); -- 2.4.5