Index: httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; @@ -41,6 +42,8 @@ @Contract(threading = ThreadingBehavior.SAFE) public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer { + private final AtomicInteger windowScalingIncrement = new AtomicInteger(0); + private volatile CapacityChannel capacityChannel; public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) { @@ -70,14 +73,19 @@ try { this.capacityChannel = capacityChannel; setInputMode(); - if (buffer().hasRemaining()) { - capacityChannel.update(buffer().remaining()); - } + signalCapacity(capacityChannel); } finally { lock.unlock(); } } + private void signalCapacity(final CapacityChannel channel) throws IOException { + final int increment = windowScalingIncrement.getAndSet(0); + if (increment > 0) { + channel.update(increment); + } + } + private void awaitInput() throws InterruptedIOException { if (!buffer().hasRemaining()) { setInputMode(); @@ -106,11 +114,11 @@ return -1; } final int b = buffer().get() & 0xff; - if (!buffer().hasRemaining() && capacityChannel != null) { + windowScalingIncrement.incrementAndGet(); + final CapacityChannel localChannel = capacityChannel; + if (!buffer().hasRemaining() && localChannel != null) { setInputMode(); - if (buffer().hasRemaining()) { - capacityChannel.update(buffer().remaining()); - } + signalCapacity(localChannel); } return b; } finally { @@ -132,11 +140,11 @@ } final int chunk = Math.min(buffer().remaining(), len); buffer().get(b, off, chunk); - if (!buffer().hasRemaining() && capacityChannel != null) { + windowScalingIncrement.addAndGet(chunk); + final CapacityChannel localChannel = capacityChannel; + if (!buffer().hasRemaining() && localChannel != null) { setInputMode(); - if (buffer().hasRemaining()) { - capacityChannel.update(buffer().remaining()); - } + signalCapacity(localChannel); } return chunk; } finally {