Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.10.0
Description
As consensus from community code style discussion, in InputGateWithMetrics#updateMetrics we can refactor to reduce the usage of Optional parameter.
cc azagrebin
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index 5d2cfd95c4..e548fbf02b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate { @Override public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException { - return updateMetrics(inputGate.getNext()); + return inputGate.getNext().map(this::updateMetrics); } @Override public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException { - return updateMetrics(inputGate.pollNext()); + return inputGate.pollNext().map(this::updateMetrics); } @Override @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate { inputGate.close(); } - private Optional<BufferOrEvent> updateMetrics(Optional<BufferOrEvent> bufferOrEvent) { - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize())); + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent bufferOrEvent) { + numBytesIn.inc(bufferOrEvent.getSize()); return bufferOrEvent; } }
Attachments
Issue Links
- links to